1 // 2 // Copyright 2018 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H 18 #define GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H 19 20 #include <grpc/event_engine/event_engine.h> 21 #include <grpc/event_engine/memory_allocator.h> 22 #include <grpc/slice.h> 23 #include <grpc/status.h> 24 #include <grpc/support/port_platform.h> 25 26 #include <atomic> 27 #include <memory> 28 29 #include "absl/base/thread_annotations.h" 30 #include "absl/status/status.h" 31 #include "absl/strings/string_view.h" 32 #include "absl/types/optional.h" 33 #include "src/core/client_channel/subchannel.h" 34 #include "src/core/lib/iomgr/call_combiner.h" 35 #include "src/core/lib/iomgr/closure.h" 36 #include "src/core/lib/iomgr/error.h" 37 #include "src/core/lib/iomgr/iomgr_fwd.h" 38 #include "src/core/lib/iomgr/polling_entity.h" 39 #include "src/core/lib/resource_quota/arena.h" 40 #include "src/core/lib/resource_quota/memory_quota.h" 41 #include "src/core/lib/slice/slice.h" 42 #include "src/core/lib/slice/slice_buffer.h" 43 #include "src/core/lib/transport/metadata_batch.h" 44 #include "src/core/lib/transport/transport.h" 45 #include "src/core/util/backoff.h" 46 #include "src/core/util/orphanable.h" 47 #include "src/core/util/ref_counted_ptr.h" 48 #include "src/core/util/sync.h" 49 50 namespace grpc_core { 51 52 // Represents a streaming call on a subchannel that should be maintained 53 // open at all times. 54 // If the call fails with UNIMPLEMENTED, no further attempts are made. 55 // If the call fails with any other status (including OK), we retry the 56 // call with appropriate backoff. 57 // The backoff state is reset when we receive a message on a stream. 58 // 59 // Currently, this assumes server-side streaming, but it could be extended 60 // to support full bidi streaming if there is a need in the future. 61 class SubchannelStreamClient final 62 : public InternallyRefCounted<SubchannelStreamClient> { 63 public: 64 // Interface implemented by caller. Thread safety is provided for the 65 // implementation; only one method will be called by any thread at any 66 // one time (including destruction). 67 // 68 // The address of the SubchannelStreamClient object is passed to most 69 // methods for logging purposes. 70 class CallEventHandler { 71 public: 72 virtual ~CallEventHandler() = default; 73 74 // Returns the path for the streaming call. 75 virtual Slice GetPathLocked() 76 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; 77 // Called when a new call attempt is being started. 78 virtual void OnCallStartLocked(SubchannelStreamClient* client) 79 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; 80 // Called when a previous call attempt has failed and the retry 81 // timer is started before the next attempt. 82 virtual void OnRetryTimerStartLocked(SubchannelStreamClient* client) 83 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; 84 // Returns the message payload to send from the client. 85 virtual grpc_slice EncodeSendMessageLocked() 86 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; 87 // Called whenever a message is received from the server. 88 virtual absl::Status RecvMessageReadyLocked( 89 SubchannelStreamClient* client, absl::string_view serialized_message) 90 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; 91 // Called when a stream fails. 92 virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client, 93 grpc_status_code status) 94 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; 95 }; 96 97 // If tracer is non-null, it enables trace logging, with the specified 98 // string being the first part of the log message. 99 // Does not take ownership of interested_parties; the caller is responsible 100 // for ensuring that it will outlive the SubchannelStreamClient. 101 SubchannelStreamClient( 102 RefCountedPtr<ConnectedSubchannel> connected_subchannel, 103 grpc_pollset_set* interested_parties, 104 std::unique_ptr<CallEventHandler> event_handler, const char* tracer); 105 106 ~SubchannelStreamClient() override; 107 108 void Orphan() override; 109 110 private: 111 // Contains a call to the backend and all the data related to the call. 112 class CallState final : public Orphanable { 113 public: 114 CallState(RefCountedPtr<SubchannelStreamClient> client, 115 grpc_pollset_set* interested_parties); 116 ~CallState() override; 117 118 void Orphan() override; 119 120 void StartCallLocked() 121 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_); 122 123 private: 124 void Cancel(); 125 126 void StartBatch(grpc_transport_stream_op_batch* batch); 127 static void StartBatchInCallCombiner(void* arg, grpc_error_handle error); 128 129 void CallEndedLocked(bool retry) 130 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&subchannel_stream_client_->mu_); 131 132 void RecvMessageReady(); 133 134 static void OnComplete(void* arg, grpc_error_handle error); 135 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); 136 static void RecvMessageReady(void* arg, grpc_error_handle error); 137 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); 138 static void StartCancel(void* arg, grpc_error_handle error); 139 static void OnCancelComplete(void* arg, grpc_error_handle error); 140 141 static void AfterCallStackDestruction(void* arg, grpc_error_handle error); 142 143 RefCountedPtr<SubchannelStreamClient> subchannel_stream_client_; 144 grpc_polling_entity pollent_; 145 146 RefCountedPtr<Arena> arena_; 147 CallCombiner call_combiner_; 148 149 // The streaming call to the backend. Always non-null. 150 // Refs are tracked manually; when the last ref is released, the 151 // CallState object will be automatically destroyed. 152 SubchannelCall* call_; 153 154 grpc_transport_stream_op_batch_payload payload_; 155 grpc_transport_stream_op_batch batch_; 156 grpc_transport_stream_op_batch recv_message_batch_; 157 grpc_transport_stream_op_batch recv_trailing_metadata_batch_; 158 159 grpc_closure on_complete_; 160 161 // send_initial_metadata 162 grpc_metadata_batch send_initial_metadata_; 163 164 // send_message 165 SliceBuffer send_message_; 166 167 // send_trailing_metadata 168 grpc_metadata_batch send_trailing_metadata_; 169 170 // recv_initial_metadata 171 grpc_metadata_batch recv_initial_metadata_; 172 grpc_closure recv_initial_metadata_ready_; 173 174 // recv_message 175 absl::optional<SliceBuffer> recv_message_; 176 grpc_closure recv_message_ready_; 177 std::atomic<bool> seen_response_{false}; 178 179 // True if the cancel_stream batch has been started. 180 std::atomic<bool> cancelled_{false}; 181 182 // recv_trailing_metadata 183 grpc_metadata_batch recv_trailing_metadata_; 184 grpc_transport_stream_stats collect_stats_; 185 grpc_closure recv_trailing_metadata_ready_; 186 187 // Closure for call stack destruction. 188 grpc_closure after_call_stack_destruction_; 189 }; 190 191 void StartCall(); 192 void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 193 194 void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 195 void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_); 196 197 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 198 grpc_pollset_set* interested_parties_; // Do not own. 199 const char* tracer_; 200 RefCountedPtr<CallArenaAllocator> call_allocator_; 201 202 Mutex mu_; 203 std::unique_ptr<CallEventHandler> event_handler_ ABSL_GUARDED_BY(mu_); 204 205 // The data associated with the current health check call. It holds a ref 206 // to this SubchannelStreamClient object. 207 OrphanablePtr<CallState> call_state_ ABSL_GUARDED_BY(mu_); 208 209 // Call retry state. 210 BackOff retry_backoff_ ABSL_GUARDED_BY(mu_); 211 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> 212 retry_timer_handle_ ABSL_GUARDED_BY(mu_); 213 // A raw pointer will suffice since connected_subchannel_ holds a copy of the 214 // ChannelArgs which holds an std::shared_ptr of the EventEngine. 215 grpc_event_engine::experimental::EventEngine* event_engine_ 216 ABSL_GUARDED_BY(mu_); 217 }; 218 219 } // namespace grpc_core 220 221 #endif // GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H 222