1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/combiner.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/gprpp/thd.h"
27 #include "test/core/util/test_config.h"
28
test_no_op(void)29 static void test_no_op(void) {
30 gpr_log(GPR_DEBUG, "test_no_op");
31 grpc_core::ExecCtx exec_ctx;
32 GRPC_COMBINER_UNREF(grpc_combiner_create(), "test_no_op");
33 }
34
set_event_to_true(void * value,grpc_error * error)35 static void set_event_to_true(void* value, grpc_error* error) {
36 gpr_event_set(static_cast<gpr_event*>(value), (void*)1);
37 }
38
test_execute_one(void)39 static void test_execute_one(void) {
40 gpr_log(GPR_DEBUG, "test_execute_one");
41
42 grpc_combiner* lock = grpc_combiner_create();
43 gpr_event done;
44 gpr_event_init(&done);
45 grpc_core::ExecCtx exec_ctx;
46 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &done,
47 grpc_combiner_scheduler(lock)),
48 GRPC_ERROR_NONE);
49 grpc_core::ExecCtx::Get()->Flush();
50 GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
51 nullptr);
52 GRPC_COMBINER_UNREF(lock, "test_execute_one");
53 }
54
55 typedef struct {
56 size_t ctr;
57 grpc_combiner* lock;
58 gpr_event done;
59 } thd_args;
60
61 typedef struct {
62 size_t* ctr;
63 size_t value;
64 } ex_args;
65
check_one(void * a,grpc_error * error)66 static void check_one(void* a, grpc_error* error) {
67 ex_args* args = static_cast<ex_args*>(a);
68 GPR_ASSERT(*args->ctr == args->value - 1);
69 *args->ctr = args->value;
70 gpr_free(a);
71 }
72
execute_many_loop(void * a)73 static void execute_many_loop(void* a) {
74 thd_args* args = static_cast<thd_args*>(a);
75 grpc_core::ExecCtx exec_ctx;
76 size_t n = 1;
77 for (size_t i = 0; i < 10; i++) {
78 for (size_t j = 0; j < 10000; j++) {
79 ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c)));
80 c->ctr = &args->ctr;
81 c->value = n++;
82 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(
83 check_one, c, grpc_combiner_scheduler(args->lock)),
84 GRPC_ERROR_NONE);
85 grpc_core::ExecCtx::Get()->Flush();
86 }
87 // sleep for a little bit, to test a combiner draining and another thread
88 // picking it up
89 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
90 }
91 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done,
92 grpc_combiner_scheduler(args->lock)),
93 GRPC_ERROR_NONE);
94 }
95
test_execute_many(void)96 static void test_execute_many(void) {
97 gpr_log(GPR_DEBUG, "test_execute_many");
98
99 grpc_combiner* lock = grpc_combiner_create();
100 grpc_core::Thread thds[100];
101 thd_args ta[GPR_ARRAY_SIZE(thds)];
102 for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
103 ta[i].ctr = 0;
104 ta[i].lock = lock;
105 gpr_event_init(&ta[i].done);
106 thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
107 thds[i].Start();
108 }
109 for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
110 GPR_ASSERT(gpr_event_wait(&ta[i].done,
111 gpr_inf_future(GPR_CLOCK_REALTIME)) != nullptr);
112 thds[i].Join();
113 }
114 grpc_core::ExecCtx exec_ctx;
115 GRPC_COMBINER_UNREF(lock, "test_execute_many");
116 }
117
118 static gpr_event got_in_finally;
119
in_finally(void * arg,grpc_error * error)120 static void in_finally(void* arg, grpc_error* error) {
121 gpr_event_set(&got_in_finally, (void*)1);
122 }
123
add_finally(void * arg,grpc_error * error)124 static void add_finally(void* arg, grpc_error* error) {
125 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(in_finally, arg,
126 grpc_combiner_finally_scheduler(
127 static_cast<grpc_combiner*>(arg))),
128 GRPC_ERROR_NONE);
129 }
130
test_execute_finally(void)131 static void test_execute_finally(void) {
132 gpr_log(GPR_DEBUG, "test_execute_finally");
133
134 grpc_combiner* lock = grpc_combiner_create();
135 grpc_core::ExecCtx exec_ctx;
136 gpr_event_init(&got_in_finally);
137 GRPC_CLOSURE_SCHED(
138 GRPC_CLOSURE_CREATE(add_finally, lock, grpc_combiner_scheduler(lock)),
139 GRPC_ERROR_NONE);
140 grpc_core::ExecCtx::Get()->Flush();
141 GPR_ASSERT(gpr_event_wait(&got_in_finally,
142 grpc_timeout_seconds_to_deadline(5)) != nullptr);
143 GRPC_COMBINER_UNREF(lock, "test_execute_finally");
144 }
145
main(int argc,char ** argv)146 int main(int argc, char** argv) {
147 grpc_test_init(argc, argv);
148 grpc_init();
149 test_no_op();
150 test_execute_one();
151 test_execute_finally();
152 test_execute_many();
153 grpc_shutdown();
154
155 return 0;
156 }
157