1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_END2END_FIXTURES_SOCKPAIR_FIXTURE_H 16 #define GRPC_TEST_CORE_END2END_FIXTURES_SOCKPAIR_FIXTURE_H 17 18 #include <grpc/grpc.h> 19 #include <grpc/impl/channel_arg_names.h> 20 #include <grpc/status.h> 21 22 #include <utility> 23 24 #include "absl/functional/any_invocable.h" 25 #include "absl/log/check.h" 26 #include "absl/status/status.h" 27 #include "absl/status/statusor.h" 28 #include "gtest/gtest.h" 29 #include "src/core/channelz/channelz.h" 30 #include "src/core/config/core_configuration.h" 31 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 32 #include "src/core/lib/channel/channel_args.h" 33 #include "src/core/lib/channel/channel_args_preconditioning.h" 34 #include "src/core/lib/iomgr/endpoint.h" 35 #include "src/core/lib/iomgr/endpoint_pair.h" 36 #include "src/core/lib/iomgr/error.h" 37 #include "src/core/lib/iomgr/exec_ctx.h" 38 #include "src/core/lib/surface/channel.h" 39 #include "src/core/lib/surface/channel_create.h" 40 #include "src/core/lib/surface/channel_stack_type.h" 41 #include "src/core/lib/surface/completion_queue.h" 42 #include "src/core/lib/transport/transport.h" 43 #include "src/core/server/server.h" 44 #include "src/core/util/ref_counted_ptr.h" 45 #include "test/core/end2end/end2end_tests.h" 46 47 namespace grpc_core { 48 49 class SockpairFixture : public CoreTestFixture { 50 public: SockpairFixture(const ChannelArgs & ep_args)51 explicit SockpairFixture(const ChannelArgs& ep_args) 52 : ep_(grpc_iomgr_create_endpoint_pair("fixture", ep_args.ToC().get())) {} 53 ~SockpairFixture()54 ~SockpairFixture() override { 55 ExecCtx exec_ctx; 56 if (ep_.client != nullptr) { 57 grpc_endpoint_destroy(ep_.client); 58 } 59 if (ep_.server != nullptr) { 60 grpc_endpoint_destroy(ep_.server); 61 } 62 } 63 64 private: MutateClientArgs(ChannelArgs args)65 virtual ChannelArgs MutateClientArgs(ChannelArgs args) { return args; } MutateServerArgs(ChannelArgs args)66 virtual ChannelArgs MutateServerArgs(ChannelArgs args) { return args; } MakeServer(const ChannelArgs & in_args,grpc_completion_queue * cq,absl::AnyInvocable<void (grpc_server *)> & pre_server_start)67 grpc_server* MakeServer( 68 const ChannelArgs& in_args, grpc_completion_queue* cq, 69 absl::AnyInvocable<void(grpc_server*)>& pre_server_start) override { 70 auto args = MutateServerArgs(in_args); 71 ExecCtx exec_ctx; 72 Transport* transport; 73 auto* server = grpc_server_create(args.ToC().get(), nullptr); 74 grpc_server_register_completion_queue(server, cq, nullptr); 75 pre_server_start(server); 76 grpc_server_start(server); 77 auto server_channel_args = CoreConfiguration::Get() 78 .channel_args_preconditioning() 79 .PreconditionChannelArgs(args.ToC().get()); 80 OrphanablePtr<grpc_endpoint> server_endpoint( 81 std::exchange(ep_.server, nullptr)); 82 EXPECT_NE(server_endpoint, nullptr); 83 grpc_endpoint_add_to_pollset(server_endpoint.get(), grpc_cq_pollset(cq)); 84 transport = grpc_create_chttp2_transport(server_channel_args, 85 std::move(server_endpoint), false); 86 Server* core_server = Server::FromC(server); 87 grpc_error_handle error = core_server->SetupTransport( 88 transport, nullptr, core_server->channel_args(), nullptr); 89 if (error.ok()) { 90 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr, 91 nullptr); 92 } else { 93 transport->Orphan(); 94 } 95 return server; 96 } MakeClient(const ChannelArgs & in_args,grpc_completion_queue *)97 grpc_channel* MakeClient(const ChannelArgs& in_args, 98 grpc_completion_queue*) override { 99 ExecCtx exec_ctx; 100 auto args = CoreConfiguration::Get() 101 .channel_args_preconditioning() 102 .PreconditionChannelArgs( 103 MutateClientArgs(in_args) 104 .Set(GRPC_ARG_DEFAULT_AUTHORITY, "test-authority") 105 .ToC() 106 .get()); 107 Transport* transport; 108 OrphanablePtr<grpc_endpoint> client_endpoint( 109 std::exchange(ep_.client, nullptr)); 110 EXPECT_NE(client_endpoint, nullptr); 111 transport = 112 grpc_create_chttp2_transport(args, std::move(client_endpoint), true); 113 auto channel = ChannelCreate("socketpair-target", args, 114 GRPC_CLIENT_DIRECT_CHANNEL, transport); 115 grpc_channel* client; 116 if (channel.ok()) { 117 client = channel->release()->c_ptr(); 118 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr, 119 nullptr); 120 } else { 121 client = grpc_lame_client_channel_create( 122 nullptr, static_cast<grpc_status_code>(channel.status().code()), 123 "lame channel"); 124 transport->Orphan(); 125 } 126 CHECK(client); 127 return client; 128 } 129 130 grpc_endpoint_pair ep_; 131 }; 132 } // namespace grpc_core 133 134 #endif // GRPC_TEST_CORE_END2END_FIXTURES_SOCKPAIR_FIXTURE_H 135