1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H 20 #define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <grpc/slice_buffer.h> 25 #include "src/core/lib/gprpp/abstract.h" 26 #include "src/core/lib/gprpp/orphanable.h" 27 #include "src/core/lib/iomgr/closure.h" 28 29 /** Internal bit flag for grpc_begin_message's \a flags signaling the use of 30 * compression for the message */ 31 #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) 32 /** Mask of all valid internal flags. */ 33 #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) 34 35 namespace grpc_core { 36 37 class ByteStream : public Orphanable { 38 public: ~ByteStream()39 virtual ~ByteStream() {} 40 41 // Returns true if the bytes are available immediately (in which case 42 // on_complete will not be called), or false if the bytes will be available 43 // asynchronously (in which case on_complete will be called when they 44 // are available). 45 // 46 // max_size_hint can be set as a hint as to the maximum number 47 // of bytes that would be acceptable to read. 48 virtual bool Next(size_t max_size_hint, 49 grpc_closure* on_complete) GRPC_ABSTRACT; 50 51 // Returns the next slice in the byte stream when it is available, as 52 // indicated by Next(). 53 // 54 // Once a slice is returned into *slice, it is owned by the caller. 55 virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT; 56 57 // Shuts down the byte stream. 58 // 59 // If there is a pending call to on_complete from Next(), it will be 60 // invoked with the error passed to Shutdown(). 61 // 62 // The next call to Pull() (if any) will return the error passed to 63 // Shutdown(). 64 virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT; 65 length()66 uint32_t length() const { return length_; } flags()67 uint32_t flags() const { return flags_; } 68 set_flags(uint32_t flags)69 void set_flags(uint32_t flags) { flags_ = flags; } 70 71 GRPC_ABSTRACT_BASE_CLASS 72 73 protected: ByteStream(uint32_t length,uint32_t flags)74 ByteStream(uint32_t length, uint32_t flags) 75 : length_(length), flags_(flags) {} 76 77 private: 78 const uint32_t length_; 79 uint32_t flags_; 80 }; 81 82 // 83 // SliceBufferByteStream 84 // 85 // A ByteStream that wraps a slice buffer. 86 // 87 88 class SliceBufferByteStream : public ByteStream { 89 public: 90 // Removes all slices in slice_buffer, leaving it empty. 91 SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); 92 93 ~SliceBufferByteStream(); 94 95 void Orphan() override; 96 97 bool Next(size_t max_size_hint, grpc_closure* on_complete) override; 98 grpc_error* Pull(grpc_slice* slice) override; 99 void Shutdown(grpc_error* error) override; 100 101 private: 102 grpc_slice_buffer backing_buffer_; 103 size_t cursor_ = 0; 104 grpc_error* shutdown_error_ = GRPC_ERROR_NONE; 105 }; 106 107 // 108 // CachingByteStream 109 // 110 // A ByteStream that that wraps an underlying byte stream but caches 111 // the resulting slices in a slice buffer. If an initial attempt fails 112 // without fully draining the underlying stream, a new caching stream 113 // can be created from the same underlying cache, in which case it will 114 // return whatever is in the backing buffer before continuing to read the 115 // underlying stream. 116 // 117 // NOTE: No synchronization is done, so it is not safe to have multiple 118 // CachingByteStreams simultaneously drawing from the same underlying 119 // ByteStreamCache at the same time. 120 // 121 122 class ByteStreamCache { 123 public: 124 class CachingByteStream : public ByteStream { 125 public: 126 explicit CachingByteStream(ByteStreamCache* cache); 127 128 ~CachingByteStream(); 129 130 void Orphan() override; 131 132 bool Next(size_t max_size_hint, grpc_closure* on_complete) override; 133 grpc_error* Pull(grpc_slice* slice) override; 134 void Shutdown(grpc_error* error) override; 135 136 // Resets the byte stream to the start of the underlying stream. 137 void Reset(); 138 139 private: 140 ByteStreamCache* cache_; 141 size_t cursor_ = 0; 142 size_t offset_ = 0; 143 grpc_error* shutdown_error_ = GRPC_ERROR_NONE; 144 }; 145 146 explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream); 147 148 ~ByteStreamCache(); 149 150 // Must not be destroyed while still in use by a CachingByteStream. 151 void Destroy(); 152 cache_buffer()153 grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } 154 155 private: 156 OrphanablePtr<ByteStream> underlying_stream_; 157 uint32_t length_; 158 uint32_t flags_; 159 grpc_slice_buffer cache_buffer_; 160 }; 161 162 } // namespace grpc_core 163 164 #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ 165