• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
20 #define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <grpc/slice_buffer.h>
25 #include "src/core/lib/gprpp/orphanable.h"
26 #include "src/core/lib/iomgr/closure.h"
27 
28 /** Internal bit flag for grpc_begin_message's \a flags signaling the use of
29  * compression for the message. (Does not apply for stream compression.) */
30 #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
31 /** Internal bit flag for determining whether the message was compressed and had
32  * to be decompressed by the message_decompress filter. (Does not apply for
33  * stream compression.) */
34 #define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u)
35 /** Mask of all valid internal flags. */
36 #define GRPC_WRITE_INTERNAL_USED_MASK \
37   (GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED)
38 
39 namespace grpc_core {
40 
41 class ByteStream : public Orphanable {
42  public:
~ByteStream()43   ~ByteStream() override {}
44 
45   // Returns true if the bytes are available immediately (in which case
46   // on_complete will not be called), or false if the bytes will be available
47   // asynchronously (in which case on_complete will be called when they
48   // are available). Should not be called if there is no data left on the
49   // stream.
50   //
51   // max_size_hint can be set as a hint as to the maximum number
52   // of bytes that would be acceptable to read.
53   virtual bool Next(size_t max_size_hint, grpc_closure* on_complete) = 0;
54 
55   // Returns the next slice in the byte stream when it is available, as
56   // indicated by Next().
57   //
58   // Once a slice is returned into *slice, it is owned by the caller.
59   virtual grpc_error* Pull(grpc_slice* slice) = 0;
60 
61   // Shuts down the byte stream.
62   //
63   // If there is a pending call to on_complete from Next(), it will be
64   // invoked with the error passed to Shutdown().
65   //
66   // The next call to Pull() (if any) will return the error passed to
67   // Shutdown().
68   virtual void Shutdown(grpc_error* error) = 0;
69 
length()70   uint32_t length() const { return length_; }
flags()71   uint32_t flags() const { return flags_; }
72 
set_flags(uint32_t flags)73   void set_flags(uint32_t flags) { flags_ = flags; }
74 
75  protected:
ByteStream(uint32_t length,uint32_t flags)76   ByteStream(uint32_t length, uint32_t flags)
77       : length_(length), flags_(flags) {}
78 
79  private:
80   const uint32_t length_;
81   uint32_t flags_;
82 };
83 
84 //
85 // SliceBufferByteStream
86 //
87 // A ByteStream that wraps a slice buffer.
88 //
89 
90 class SliceBufferByteStream : public ByteStream {
91  public:
92   // Removes all slices in slice_buffer, leaving it empty.
93   SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);
94 
95   ~SliceBufferByteStream() override;
96 
97   void Orphan() override;
98 
99   bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
100   grpc_error* Pull(grpc_slice* slice) override;
101   void Shutdown(grpc_error* error) override;
102 
103  private:
104   grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
105   grpc_slice_buffer backing_buffer_;
106 };
107 
108 //
109 // CachingByteStream
110 //
111 // A ByteStream that that wraps an underlying byte stream but caches
112 // the resulting slices in a slice buffer.  If an initial attempt fails
113 // without fully draining the underlying stream, a new caching stream
114 // can be created from the same underlying cache, in which case it will
115 // return whatever is in the backing buffer before continuing to read the
116 // underlying stream.
117 //
118 // NOTE: No synchronization is done, so it is not safe to have multiple
119 // CachingByteStreams simultaneously drawing from the same underlying
120 // ByteStreamCache at the same time.
121 //
122 
123 class ByteStreamCache {
124  public:
125   class CachingByteStream : public ByteStream {
126    public:
127     explicit CachingByteStream(ByteStreamCache* cache);
128 
129     ~CachingByteStream() override;
130 
131     void Orphan() override;
132 
133     bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
134     grpc_error* Pull(grpc_slice* slice) override;
135     void Shutdown(grpc_error* error) override;
136 
137     // Resets the byte stream to the start of the underlying stream.
138     void Reset();
139 
140    private:
141     ByteStreamCache* cache_;
142     size_t cursor_ = 0;
143     size_t offset_ = 0;
144     grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
145   };
146 
147   explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);
148 
149   ~ByteStreamCache();
150 
151   // Must not be destroyed while still in use by a CachingByteStream.
152   void Destroy();
153 
cache_buffer()154   grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }
155 
156  private:
157   OrphanablePtr<ByteStream> underlying_stream_;
158   uint32_t length_;
159   uint32_t flags_;
160   grpc_slice_buffer cache_buffer_;
161 };
162 
163 }  // namespace grpc_core
164 
165 #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
166