1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/transport/byte_stream.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/iomgr/exec_ctx.h"
27 #include "src/core/lib/slice/slice_internal.h"
28
29 #include "test/core/util/test_config.h"
30
31 #include <gtest/gtest.h>
32
33 namespace grpc_core {
34 namespace {
35
36 //
37 // SliceBufferByteStream tests
38 //
39
NotCalledClosure(void *,grpc_error *)40 void NotCalledClosure(void* /*arg*/, grpc_error* /*error*/) {
41 GPR_ASSERT(false);
42 }
43
TEST(SliceBufferByteStream,Basic)44 TEST(SliceBufferByteStream, Basic) {
45 grpc_core::ExecCtx exec_ctx;
46 // Create and populate slice buffer.
47 grpc_slice_buffer buffer;
48 grpc_slice_buffer_init(&buffer);
49 grpc_slice input[] = {
50 grpc_slice_from_static_string("foo"),
51 grpc_slice_from_static_string("bar"),
52 };
53 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
54 grpc_slice_buffer_add(&buffer, input[i]);
55 }
56 // Create byte stream.
57 SliceBufferByteStream stream(&buffer, 0);
58 grpc_slice_buffer_destroy_internal(&buffer);
59 EXPECT_EQ(6U, stream.length());
60 grpc_closure closure;
61 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
62 grpc_schedule_on_exec_ctx);
63 // Read each slice. Note that Next() always returns synchronously.
64 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
65 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
66 grpc_slice output;
67 grpc_error* error = stream.Pull(&output);
68 EXPECT_TRUE(error == GRPC_ERROR_NONE);
69 EXPECT_TRUE(grpc_slice_eq(input[i], output));
70 grpc_slice_unref_internal(output);
71 }
72 // Clean up.
73 stream.Orphan();
74 }
75
TEST(SliceBufferByteStream,Shutdown)76 TEST(SliceBufferByteStream, Shutdown) {
77 grpc_core::ExecCtx exec_ctx;
78 // Create and populate slice buffer.
79 grpc_slice_buffer buffer;
80 grpc_slice_buffer_init(&buffer);
81 grpc_slice input[] = {
82 grpc_slice_from_static_string("foo"),
83 grpc_slice_from_static_string("bar"),
84 };
85 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
86 grpc_slice_buffer_add(&buffer, input[i]);
87 }
88 // Create byte stream.
89 SliceBufferByteStream stream(&buffer, 0);
90 grpc_slice_buffer_destroy_internal(&buffer);
91 EXPECT_EQ(6U, stream.length());
92 grpc_closure closure;
93 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
94 grpc_schedule_on_exec_ctx);
95 // Read the first slice.
96 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
97 grpc_slice output;
98 grpc_error* error = stream.Pull(&output);
99 EXPECT_TRUE(error == GRPC_ERROR_NONE);
100 EXPECT_TRUE(grpc_slice_eq(input[0], output));
101 grpc_slice_unref_internal(output);
102 // Now shutdown.
103 grpc_error* shutdown_error =
104 GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
105 stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
106 // After shutdown, the next pull() should return the error.
107 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
108 error = stream.Pull(&output);
109 EXPECT_TRUE(error == shutdown_error);
110 GRPC_ERROR_UNREF(error);
111 GRPC_ERROR_UNREF(shutdown_error);
112 // Clean up.
113 stream.Orphan();
114 }
115
116 //
117 // CachingByteStream tests
118 //
119
TEST(CachingByteStream,Basic)120 TEST(CachingByteStream, Basic) {
121 grpc_core::ExecCtx exec_ctx;
122 // Create and populate slice buffer byte stream.
123 grpc_slice_buffer buffer;
124 grpc_slice_buffer_init(&buffer);
125 grpc_slice input[] = {
126 grpc_slice_from_static_string("foo"),
127 grpc_slice_from_static_string("bar"),
128 };
129 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
130 grpc_slice_buffer_add(&buffer, input[i]);
131 }
132 SliceBufferByteStream underlying_stream(&buffer, 0);
133 grpc_slice_buffer_destroy_internal(&buffer);
134 // Create cache and caching stream.
135 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
136 ByteStreamCache::CachingByteStream stream(&cache);
137 grpc_closure closure;
138 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
139 grpc_schedule_on_exec_ctx);
140 // Read each slice. Note that next() always returns synchronously,
141 // because the underlying byte stream always does.
142 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
143 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
144 grpc_slice output;
145 grpc_error* error = stream.Pull(&output);
146 EXPECT_TRUE(error == GRPC_ERROR_NONE);
147 EXPECT_TRUE(grpc_slice_eq(input[i], output));
148 grpc_slice_unref_internal(output);
149 }
150 // Clean up.
151 stream.Orphan();
152 cache.Destroy();
153 }
154
TEST(CachingByteStream,Reset)155 TEST(CachingByteStream, Reset) {
156 grpc_core::ExecCtx exec_ctx;
157 // Create and populate slice buffer byte stream.
158 grpc_slice_buffer buffer;
159 grpc_slice_buffer_init(&buffer);
160 grpc_slice input[] = {
161 grpc_slice_from_static_string("foo"),
162 grpc_slice_from_static_string("bar"),
163 };
164 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
165 grpc_slice_buffer_add(&buffer, input[i]);
166 }
167 SliceBufferByteStream underlying_stream(&buffer, 0);
168 grpc_slice_buffer_destroy_internal(&buffer);
169 // Create cache and caching stream.
170 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
171 ByteStreamCache::CachingByteStream stream(&cache);
172 grpc_closure closure;
173 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
174 grpc_schedule_on_exec_ctx);
175 // Read one slice.
176 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
177 grpc_slice output;
178 grpc_error* error = stream.Pull(&output);
179 EXPECT_TRUE(error == GRPC_ERROR_NONE);
180 EXPECT_TRUE(grpc_slice_eq(input[0], output));
181 grpc_slice_unref_internal(output);
182 // Reset the caching stream. The reads should start over from the
183 // first slice.
184 stream.Reset();
185 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
186 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
187 error = stream.Pull(&output);
188 EXPECT_TRUE(error == GRPC_ERROR_NONE);
189 EXPECT_TRUE(grpc_slice_eq(input[i], output));
190 grpc_slice_unref_internal(output);
191 }
192 // Clean up.
193 stream.Orphan();
194 cache.Destroy();
195 }
196
TEST(CachingByteStream,SharedCache)197 TEST(CachingByteStream, SharedCache) {
198 grpc_core::ExecCtx exec_ctx;
199 // Create and populate slice buffer byte stream.
200 grpc_slice_buffer buffer;
201 grpc_slice_buffer_init(&buffer);
202 grpc_slice input[] = {
203 grpc_slice_from_static_string("foo"),
204 grpc_slice_from_static_string("bar"),
205 };
206 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
207 grpc_slice_buffer_add(&buffer, input[i]);
208 }
209 SliceBufferByteStream underlying_stream(&buffer, 0);
210 grpc_slice_buffer_destroy_internal(&buffer);
211 // Create cache and two caching streams.
212 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
213 ByteStreamCache::CachingByteStream stream1(&cache);
214 ByteStreamCache::CachingByteStream stream2(&cache);
215 grpc_closure closure;
216 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
217 grpc_schedule_on_exec_ctx);
218 // Read one slice from stream1.
219 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
220 grpc_slice output;
221 grpc_error* error = stream1.Pull(&output);
222 EXPECT_TRUE(error == GRPC_ERROR_NONE);
223 EXPECT_TRUE(grpc_slice_eq(input[0], output));
224 grpc_slice_unref_internal(output);
225 // Read all slices from stream2.
226 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
227 EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
228 error = stream2.Pull(&output);
229 EXPECT_TRUE(error == GRPC_ERROR_NONE);
230 EXPECT_TRUE(grpc_slice_eq(input[i], output));
231 grpc_slice_unref_internal(output);
232 }
233 // Now read the second slice from stream1.
234 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
235 error = stream1.Pull(&output);
236 EXPECT_TRUE(error == GRPC_ERROR_NONE);
237 EXPECT_TRUE(grpc_slice_eq(input[1], output));
238 grpc_slice_unref_internal(output);
239 // Clean up.
240 stream1.Orphan();
241 stream2.Orphan();
242 cache.Destroy();
243 }
244
245 } // namespace
246 } // namespace grpc_core
247
main(int argc,char ** argv)248 int main(int argc, char** argv) {
249 ::testing::InitGoogleTest(&argc, argv);
250 grpc::testing::TestEnvironment env(argc, argv);
251 grpc_init();
252 int retval = RUN_ALL_TESTS();
253 grpc_shutdown();
254 return retval;
255 }
256