1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/core/test_util/mock_endpoint.h"
20
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/sync.h>
24
25 #include <memory>
26
27 #include "absl/log/check.h"
28 #include "absl/status/status.h"
29 #include "absl/strings/string_view.h"
30 #include "src/core/lib/event_engine/tcp_socket_utils.h"
31 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
32 #include "src/core/util/down_cast.h"
33
34 namespace grpc_event_engine {
35 namespace experimental {
36
MockEndpoint()37 MockEndpoint::MockEndpoint()
38 : peer_addr_(URIToResolvedAddress("ipv4:127.0.0.1:12345").value()),
39 local_addr_(URIToResolvedAddress("ipv4:127.0.0.1:6789").value()) {}
40
~MockEndpointController()41 MockEndpointController::~MockEndpointController() {
42 grpc_core::MutexLock lock(&mu_);
43 if (on_read_) {
44 engine_->Run([cb = std::move(on_read_)]() mutable {
45 cb(absl::InternalError("Endpoint Shutdown"));
46 });
47 on_read_ = nullptr;
48 }
49 }
50
Create(std::shared_ptr<EventEngine> engine)51 std::shared_ptr<MockEndpointController> MockEndpointController::Create(
52 std::shared_ptr<EventEngine> engine) {
53 return std::shared_ptr<MockEndpointController>(
54 new MockEndpointController(std::move(engine)));
55 }
56
MockEndpointController(std::shared_ptr<EventEngine> engine)57 MockEndpointController::MockEndpointController(
58 std::shared_ptr<EventEngine> engine)
59 : engine_(std::move(engine)),
60 mock_grpc_endpoint_(grpc_event_engine_endpoint_create(
61 std::make_unique<grpc_event_engine::experimental::MockEndpoint>())) {}
62
TriggerReadEvent(Slice read_data)63 void MockEndpointController::TriggerReadEvent(Slice read_data) {
64 grpc_core::MutexLock lock(&mu_);
65 CHECK(!reads_done_)
66 << "Cannot trigger a read event after NoMoreReads has been called.";
67 if (on_read_) {
68 on_read_slice_buffer_->Append(std::move(read_data));
69 engine_->Run(
70 [cb = std::move(on_read_)]() mutable { cb(absl::OkStatus()); });
71 on_read_ = nullptr;
72 on_read_slice_buffer_ = nullptr;
73 } else {
74 read_buffer_.Append(std::move(read_data));
75 }
76 }
77
NoMoreReads()78 void MockEndpointController::NoMoreReads() {
79 grpc_core::MutexLock lock(&mu_);
80 CHECK(!std::exchange(reads_done_, true))
81 << "NoMoreReads() can only be called once";
82 }
83
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer)84 void MockEndpointController::Read(
85 absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer) {
86 grpc_core::MutexLock lock(&mu_);
87 if (read_buffer_.Count() > 0) {
88 CHECK(buffer->Count() == 0);
89 CHECK(!on_read_);
90 read_buffer_.Swap(*buffer);
91 engine_->Run([cb = std::move(on_read)]() mutable { cb(absl::OkStatus()); });
92 } else if (reads_done_) {
93 engine_->Run([cb = std::move(on_read)]() mutable {
94 cb(absl::UnavailableError("reads done"));
95 });
96 } else {
97 on_read_ = std::move(on_read);
98 on_read_slice_buffer_ = buffer;
99 }
100 }
101
TakeCEndpoint()102 grpc_endpoint* MockEndpointController::TakeCEndpoint() {
103 CHECK_NE(mock_grpc_endpoint_, nullptr)
104 << "The endpoint has already been taken";
105 grpc_core::DownCast<MockEndpoint*>(
106 grpc_get_wrapped_event_engine_endpoint(mock_grpc_endpoint_))
107 ->SetController(shared_from_this());
108 auto ret = mock_grpc_endpoint_;
109 mock_grpc_endpoint_ = nullptr;
110 return ret;
111 }
112
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)113 bool MockEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
114 SliceBuffer* buffer, const ReadArgs* /* args */) {
115 endpoint_control_->Read(std::move(on_read), buffer);
116 return false;
117 }
118
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)119 bool MockEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
120 SliceBuffer* data, const WriteArgs* /* args */) {
121 // No-op implementation. Nothing was using it.
122 data->Clear();
123 endpoint_control_->engine()->Run(
124 [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
125 return false;
126 }
127
GetPeerAddress() const128 const EventEngine::ResolvedAddress& MockEndpoint::GetPeerAddress() const {
129 return peer_addr_;
130 }
131
GetLocalAddress() const132 const EventEngine::ResolvedAddress& MockEndpoint::GetLocalAddress() const {
133 return local_addr_;
134 }
135
136 } // namespace experimental
137 } // namespace grpc_event_engine
138