• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20    using that endpoint. Because of various transitive includes in uv.h,
21    including windows.h on Windows, uv.h must be included before other system
22    headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24 
25 #include "test/core/util/passthru_endpoint.h"
26 
27 #include <inttypes.h>
28 #include <string.h>
29 
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/string_util.h>
32 #include "src/core/lib/iomgr/sockaddr.h"
33 
34 #include "src/core/lib/slice/slice_internal.h"
35 
36 typedef struct passthru_endpoint passthru_endpoint;
37 
38 typedef struct {
39   grpc_endpoint base;
40   passthru_endpoint* parent;
41   grpc_slice_buffer read_buffer;
42   grpc_slice_buffer* on_read_out;
43   grpc_closure* on_read;
44   grpc_resource_user* resource_user;
45 } half;
46 
47 struct passthru_endpoint {
48   gpr_mu mu;
49   int halves;
50   grpc_passthru_endpoint_stats* stats;
51   bool shutdown;
52   half client;
53   half server;
54 };
55 
me_read(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb)56 static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
57                     grpc_closure* cb) {
58   half* m = reinterpret_cast<half*>(ep);
59   gpr_mu_lock(&m->parent->mu);
60   if (m->parent->shutdown) {
61     GRPC_CLOSURE_SCHED(
62         cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
63   } else if (m->read_buffer.count > 0) {
64     grpc_slice_buffer_swap(&m->read_buffer, slices);
65     GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
66   } else {
67     m->on_read = cb;
68     m->on_read_out = slices;
69   }
70   gpr_mu_unlock(&m->parent->mu);
71 }
72 
other_half(half * h)73 static half* other_half(half* h) {
74   if (h == &h->parent->client) return &h->parent->server;
75   return &h->parent->client;
76 }
77 
me_write(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)78 static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
79                      grpc_closure* cb, void* arg) {
80   half* m = other_half(reinterpret_cast<half*>(ep));
81   gpr_mu_lock(&m->parent->mu);
82   grpc_error* error = GRPC_ERROR_NONE;
83   gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
84   if (m->parent->shutdown) {
85     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
86   } else if (m->on_read != nullptr) {
87     for (size_t i = 0; i < slices->count; i++) {
88       grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
89     }
90     GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
91     m->on_read = nullptr;
92   } else {
93     for (size_t i = 0; i < slices->count; i++) {
94       grpc_slice_buffer_add(&m->read_buffer,
95                             grpc_slice_copy(slices->slices[i]));
96     }
97   }
98   gpr_mu_unlock(&m->parent->mu);
99   GRPC_CLOSURE_SCHED(cb, error);
100 }
101 
me_add_to_pollset(grpc_endpoint * ep,grpc_pollset * pollset)102 static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
103 
me_add_to_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset)104 static void me_add_to_pollset_set(grpc_endpoint* ep,
105                                   grpc_pollset_set* pollset) {}
106 
me_delete_from_pollset_set(grpc_endpoint * ep,grpc_pollset_set * pollset)107 static void me_delete_from_pollset_set(grpc_endpoint* ep,
108                                        grpc_pollset_set* pollset) {}
109 
me_shutdown(grpc_endpoint * ep,grpc_error * why)110 static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
111   half* m = reinterpret_cast<half*>(ep);
112   gpr_mu_lock(&m->parent->mu);
113   m->parent->shutdown = true;
114   if (m->on_read) {
115     GRPC_CLOSURE_SCHED(
116         m->on_read,
117         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
118     m->on_read = nullptr;
119   }
120   m = other_half(m);
121   if (m->on_read) {
122     GRPC_CLOSURE_SCHED(
123         m->on_read,
124         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
125     m->on_read = nullptr;
126   }
127   gpr_mu_unlock(&m->parent->mu);
128   grpc_resource_user_shutdown(m->resource_user);
129   GRPC_ERROR_UNREF(why);
130 }
131 
me_destroy(grpc_endpoint * ep)132 static void me_destroy(grpc_endpoint* ep) {
133   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
134   gpr_mu_lock(&p->mu);
135   if (0 == --p->halves) {
136     gpr_mu_unlock(&p->mu);
137     gpr_mu_destroy(&p->mu);
138     grpc_passthru_endpoint_stats_destroy(p->stats);
139     grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
140     grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
141     grpc_resource_user_unref(p->client.resource_user);
142     grpc_resource_user_unref(p->server.resource_user);
143     gpr_free(p);
144   } else {
145     gpr_mu_unlock(&p->mu);
146   }
147 }
148 
me_get_peer(grpc_endpoint * ep)149 static char* me_get_peer(grpc_endpoint* ep) {
150   passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
151   return (reinterpret_cast<half*>(ep)) == &p->client
152              ? gpr_strdup("fake:mock_client_endpoint")
153              : gpr_strdup("fake:mock_server_endpoint");
154 }
155 
me_get_fd(grpc_endpoint * ep)156 static int me_get_fd(grpc_endpoint* ep) { return -1; }
157 
me_get_resource_user(grpc_endpoint * ep)158 static grpc_resource_user* me_get_resource_user(grpc_endpoint* ep) {
159   half* m = reinterpret_cast<half*>(ep);
160   return m->resource_user;
161 }
162 
163 static const grpc_endpoint_vtable vtable = {
164     me_read,
165     me_write,
166     me_add_to_pollset,
167     me_add_to_pollset_set,
168     me_delete_from_pollset_set,
169     me_shutdown,
170     me_destroy,
171     me_get_resource_user,
172     me_get_peer,
173     me_get_fd,
174 };
175 
half_init(half * m,passthru_endpoint * parent,grpc_resource_quota * resource_quota,const char * half_name)176 static void half_init(half* m, passthru_endpoint* parent,
177                       grpc_resource_quota* resource_quota,
178                       const char* half_name) {
179   m->base.vtable = &vtable;
180   m->parent = parent;
181   grpc_slice_buffer_init(&m->read_buffer);
182   m->on_read = nullptr;
183   char* name;
184   gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
185                (intptr_t)parent);
186   m->resource_user = grpc_resource_user_create(resource_quota, name);
187   gpr_free(name);
188 }
189 
grpc_passthru_endpoint_create(grpc_endpoint ** client,grpc_endpoint ** server,grpc_resource_quota * resource_quota,grpc_passthru_endpoint_stats * stats)190 void grpc_passthru_endpoint_create(grpc_endpoint** client,
191                                    grpc_endpoint** server,
192                                    grpc_resource_quota* resource_quota,
193                                    grpc_passthru_endpoint_stats* stats) {
194   passthru_endpoint* m =
195       static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
196   m->halves = 2;
197   m->shutdown = 0;
198   if (stats == nullptr) {
199     m->stats = grpc_passthru_endpoint_stats_create();
200   } else {
201     gpr_ref(&stats->refs);
202     m->stats = stats;
203   }
204   half_init(&m->client, m, resource_quota, "client");
205   half_init(&m->server, m, resource_quota, "server");
206   gpr_mu_init(&m->mu);
207   *client = &m->client.base;
208   *server = &m->server.base;
209 }
210 
grpc_passthru_endpoint_stats_create()211 grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create() {
212   grpc_passthru_endpoint_stats* stats =
213       static_cast<grpc_passthru_endpoint_stats*>(
214           gpr_malloc(sizeof(grpc_passthru_endpoint_stats)));
215   memset(stats, 0, sizeof(*stats));
216   gpr_ref_init(&stats->refs, 1);
217   return stats;
218 }
219 
grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats * stats)220 void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
221   if (gpr_unref(&stats->refs)) {
222     gpr_free(stats);
223   }
224 }
225