1 // 2 // Copyright 2022 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_XDS_GRPC_XDS_TRANSPORT_GRPC_H 18 #define GRPC_SRC_CORE_XDS_GRPC_XDS_TRANSPORT_GRPC_H 19 20 #include <grpc/grpc.h> 21 #include <grpc/slice.h> 22 #include <grpc/status.h> 23 #include <grpc/support/port_platform.h> 24 25 #include <functional> 26 #include <memory> 27 #include <string> 28 29 #include "absl/container/flat_hash_map.h" 30 #include "absl/status/status.h" 31 #include "src/core/lib/channel/channel_args.h" 32 #include "src/core/lib/iomgr/closure.h" 33 #include "src/core/lib/iomgr/error.h" 34 #include "src/core/lib/iomgr/iomgr_fwd.h" 35 #include "src/core/lib/surface/channel.h" 36 #include "src/core/util/orphanable.h" 37 #include "src/core/util/ref_counted_ptr.h" 38 #include "src/core/util/sync.h" 39 #include "src/core/xds/xds_client/xds_bootstrap.h" 40 #include "src/core/xds/xds_client/xds_transport.h" 41 42 namespace grpc_core { 43 44 class GrpcXdsTransportFactory final : public XdsTransportFactory { 45 public: 46 class GrpcXdsTransport; 47 48 explicit GrpcXdsTransportFactory(const ChannelArgs& args); 49 ~GrpcXdsTransportFactory() override; 50 Orphaned()51 void Orphaned() override {} 52 53 RefCountedPtr<XdsTransport> GetTransport( 54 const XdsBootstrap::XdsServer& server, absl::Status* status) override; 55 interested_parties()56 grpc_pollset_set* interested_parties() const { return interested_parties_; } 57 58 private: 59 ChannelArgs args_; 60 grpc_pollset_set* interested_parties_; 61 62 Mutex mu_; 63 absl::flat_hash_map<std::string /*XdsServer key*/, GrpcXdsTransport*> 64 transports_ ABSL_GUARDED_BY(&mu_); 65 }; 66 67 class GrpcXdsTransportFactory::GrpcXdsTransport final 68 : public XdsTransportFactory::XdsTransport { 69 public: 70 class GrpcStreamingCall; 71 72 GrpcXdsTransport(WeakRefCountedPtr<GrpcXdsTransportFactory> factory, 73 const XdsBootstrap::XdsServer& server, absl::Status* status); 74 ~GrpcXdsTransport() override; 75 76 void Orphaned() override; 77 78 void StartConnectivityFailureWatch( 79 RefCountedPtr<ConnectivityFailureWatcher> watcher) override; 80 void StopConnectivityFailureWatch( 81 const RefCountedPtr<ConnectivityFailureWatcher>& watcher) override; 82 83 OrphanablePtr<StreamingCall> CreateStreamingCall( 84 const char* method, 85 std::unique_ptr<StreamingCall::EventHandler> event_handler) override; 86 87 void ResetBackoff() override; 88 89 private: 90 class StateWatcher; 91 92 WeakRefCountedPtr<GrpcXdsTransportFactory> factory_; 93 std::string key_; 94 RefCountedPtr<Channel> channel_; 95 96 Mutex mu_; 97 absl::flat_hash_map<RefCountedPtr<ConnectivityFailureWatcher>, StateWatcher*> 98 watchers_ ABSL_GUARDED_BY(&mu_); 99 }; 100 101 class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall final 102 : public XdsTransportFactory::XdsTransport::StreamingCall { 103 public: 104 GrpcStreamingCall(WeakRefCountedPtr<GrpcXdsTransportFactory> factory, 105 Channel* channel, const char* method, 106 std::unique_ptr<StreamingCall::EventHandler> event_handler); 107 ~GrpcStreamingCall() override; 108 109 void Orphan() override; 110 111 void SendMessage(std::string payload) override; 112 113 void StartRecvMessage() override; 114 115 private: 116 static void OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/); 117 static void OnRequestSent(void* arg, grpc_error_handle error); 118 static void OnResponseReceived(void* arg, grpc_error_handle /*error*/); 119 static void OnStatusReceived(void* arg, grpc_error_handle /*error*/); 120 121 WeakRefCountedPtr<GrpcXdsTransportFactory> factory_; 122 123 std::unique_ptr<StreamingCall::EventHandler> event_handler_; 124 125 // Always non-NULL. 126 grpc_call* call_; 127 128 // recv_initial_metadata 129 grpc_metadata_array initial_metadata_recv_; 130 grpc_closure on_recv_initial_metadata_; 131 132 // send_message 133 grpc_byte_buffer* send_message_payload_ = nullptr; 134 grpc_closure on_request_sent_; 135 136 // recv_message 137 grpc_byte_buffer* recv_message_payload_ = nullptr; 138 grpc_closure on_response_received_; 139 140 // recv_trailing_metadata 141 grpc_metadata_array trailing_metadata_recv_; 142 grpc_status_code status_code_; 143 grpc_slice status_details_; 144 grpc_closure on_status_received_; 145 }; 146 147 } // namespace grpc_core 148 149 #endif // GRPC_SRC_CORE_XDS_GRPC_XDS_TRANSPORT_GRPC_H 150