1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/transport/byte_stream.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/iomgr/exec_ctx.h"
27 #include "src/core/lib/slice/slice_internal.h"
28
29 #include "test/core/util/test_config.h"
30
31 #include <gtest/gtest.h>
32
33 namespace grpc_core {
34 namespace {
35
36 //
37 // SliceBufferByteStream tests
38 //
39
NotCalledClosure(void * arg,grpc_error * error)40 void NotCalledClosure(void* arg, grpc_error* error) { GPR_ASSERT(false); }
41
TEST(SliceBufferByteStream,Basic)42 TEST(SliceBufferByteStream, Basic) {
43 grpc_core::ExecCtx exec_ctx;
44 // Create and populate slice buffer.
45 grpc_slice_buffer buffer;
46 grpc_slice_buffer_init(&buffer);
47 grpc_slice input[] = {
48 grpc_slice_from_static_string("foo"),
49 grpc_slice_from_static_string("bar"),
50 };
51 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
52 grpc_slice_buffer_add(&buffer, input[i]);
53 }
54 // Create byte stream.
55 SliceBufferByteStream stream(&buffer, 0);
56 grpc_slice_buffer_destroy_internal(&buffer);
57 EXPECT_EQ(6U, stream.length());
58 grpc_closure closure;
59 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
60 grpc_schedule_on_exec_ctx);
61 // Read each slice. Note that Next() always returns synchronously.
62 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
63 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
64 grpc_slice output;
65 grpc_error* error = stream.Pull(&output);
66 EXPECT_TRUE(error == GRPC_ERROR_NONE);
67 EXPECT_TRUE(grpc_slice_eq(input[i], output));
68 grpc_slice_unref_internal(output);
69 }
70 // Clean up.
71 stream.Orphan();
72 }
73
TEST(SliceBufferByteStream,Shutdown)74 TEST(SliceBufferByteStream, Shutdown) {
75 grpc_core::ExecCtx exec_ctx;
76 // Create and populate slice buffer.
77 grpc_slice_buffer buffer;
78 grpc_slice_buffer_init(&buffer);
79 grpc_slice input[] = {
80 grpc_slice_from_static_string("foo"),
81 grpc_slice_from_static_string("bar"),
82 };
83 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
84 grpc_slice_buffer_add(&buffer, input[i]);
85 }
86 // Create byte stream.
87 SliceBufferByteStream stream(&buffer, 0);
88 grpc_slice_buffer_destroy_internal(&buffer);
89 EXPECT_EQ(6U, stream.length());
90 grpc_closure closure;
91 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
92 grpc_schedule_on_exec_ctx);
93 // Read the first slice.
94 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
95 grpc_slice output;
96 grpc_error* error = stream.Pull(&output);
97 EXPECT_TRUE(error == GRPC_ERROR_NONE);
98 EXPECT_TRUE(grpc_slice_eq(input[0], output));
99 grpc_slice_unref_internal(output);
100 // Now shutdown.
101 grpc_error* shutdown_error =
102 GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
103 stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
104 // After shutdown, the next pull() should return the error.
105 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
106 error = stream.Pull(&output);
107 EXPECT_TRUE(error == shutdown_error);
108 GRPC_ERROR_UNREF(error);
109 GRPC_ERROR_UNREF(shutdown_error);
110 // Clean up.
111 stream.Orphan();
112 }
113
114 //
115 // CachingByteStream tests
116 //
117
TEST(CachingByteStream,Basic)118 TEST(CachingByteStream, Basic) {
119 grpc_core::ExecCtx exec_ctx;
120 // Create and populate slice buffer byte stream.
121 grpc_slice_buffer buffer;
122 grpc_slice_buffer_init(&buffer);
123 grpc_slice input[] = {
124 grpc_slice_from_static_string("foo"),
125 grpc_slice_from_static_string("bar"),
126 };
127 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
128 grpc_slice_buffer_add(&buffer, input[i]);
129 }
130 SliceBufferByteStream underlying_stream(&buffer, 0);
131 grpc_slice_buffer_destroy_internal(&buffer);
132 // Create cache and caching stream.
133 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
134 ByteStreamCache::CachingByteStream stream(&cache);
135 grpc_closure closure;
136 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
137 grpc_schedule_on_exec_ctx);
138 // Read each slice. Note that next() always returns synchronously,
139 // because the underlying byte stream always does.
140 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
141 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
142 grpc_slice output;
143 grpc_error* error = stream.Pull(&output);
144 EXPECT_TRUE(error == GRPC_ERROR_NONE);
145 EXPECT_TRUE(grpc_slice_eq(input[i], output));
146 grpc_slice_unref_internal(output);
147 }
148 // Clean up.
149 stream.Orphan();
150 cache.Destroy();
151 }
152
TEST(CachingByteStream,Reset)153 TEST(CachingByteStream, Reset) {
154 grpc_core::ExecCtx exec_ctx;
155 // Create and populate slice buffer byte stream.
156 grpc_slice_buffer buffer;
157 grpc_slice_buffer_init(&buffer);
158 grpc_slice input[] = {
159 grpc_slice_from_static_string("foo"),
160 grpc_slice_from_static_string("bar"),
161 };
162 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
163 grpc_slice_buffer_add(&buffer, input[i]);
164 }
165 SliceBufferByteStream underlying_stream(&buffer, 0);
166 grpc_slice_buffer_destroy_internal(&buffer);
167 // Create cache and caching stream.
168 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
169 ByteStreamCache::CachingByteStream stream(&cache);
170 grpc_closure closure;
171 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
172 grpc_schedule_on_exec_ctx);
173 // Read one slice.
174 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
175 grpc_slice output;
176 grpc_error* error = stream.Pull(&output);
177 EXPECT_TRUE(error == GRPC_ERROR_NONE);
178 EXPECT_TRUE(grpc_slice_eq(input[0], output));
179 grpc_slice_unref_internal(output);
180 // Reset the caching stream. The reads should start over from the
181 // first slice.
182 stream.Reset();
183 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
184 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
185 error = stream.Pull(&output);
186 EXPECT_TRUE(error == GRPC_ERROR_NONE);
187 EXPECT_TRUE(grpc_slice_eq(input[i], output));
188 grpc_slice_unref_internal(output);
189 }
190 // Clean up.
191 stream.Orphan();
192 cache.Destroy();
193 }
194
TEST(CachingByteStream,SharedCache)195 TEST(CachingByteStream, SharedCache) {
196 grpc_core::ExecCtx exec_ctx;
197 // Create and populate slice buffer byte stream.
198 grpc_slice_buffer buffer;
199 grpc_slice_buffer_init(&buffer);
200 grpc_slice input[] = {
201 grpc_slice_from_static_string("foo"),
202 grpc_slice_from_static_string("bar"),
203 };
204 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
205 grpc_slice_buffer_add(&buffer, input[i]);
206 }
207 SliceBufferByteStream underlying_stream(&buffer, 0);
208 grpc_slice_buffer_destroy_internal(&buffer);
209 // Create cache and two caching streams.
210 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
211 ByteStreamCache::CachingByteStream stream1(&cache);
212 ByteStreamCache::CachingByteStream stream2(&cache);
213 grpc_closure closure;
214 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
215 grpc_schedule_on_exec_ctx);
216 // Read one slice from stream1.
217 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
218 grpc_slice output;
219 grpc_error* error = stream1.Pull(&output);
220 EXPECT_TRUE(error == GRPC_ERROR_NONE);
221 EXPECT_TRUE(grpc_slice_eq(input[0], output));
222 grpc_slice_unref_internal(output);
223 // Read all slices from stream2.
224 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
225 EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
226 error = stream2.Pull(&output);
227 EXPECT_TRUE(error == GRPC_ERROR_NONE);
228 EXPECT_TRUE(grpc_slice_eq(input[i], output));
229 grpc_slice_unref_internal(output);
230 }
231 // Now read the second slice from stream1.
232 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
233 error = stream1.Pull(&output);
234 EXPECT_TRUE(error == GRPC_ERROR_NONE);
235 EXPECT_TRUE(grpc_slice_eq(input[1], output));
236 grpc_slice_unref_internal(output);
237 // Clean up.
238 stream1.Orphan();
239 stream2.Orphan();
240 cache.Destroy();
241 }
242
243 } // namespace
244 } // namespace grpc_core
245
main(int argc,char ** argv)246 int main(int argc, char** argv) {
247 grpc_init();
248 grpc_test_init(argc, argv);
249 ::testing::InitGoogleTest(&argc, argv);
250 int retval = RUN_ALL_TESTS();
251 grpc_shutdown();
252 return retval;
253 }
254