// // // Copyright 2023 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #include "test/core/end2end/fuzzers/fuzzing_common.h" #include #include #include #include "absl/strings/str_cat.h" #include "absl/types/optional.h" #include #include #include #include #include #include #include #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/experiments/config.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel.h" #include "test/core/end2end/fuzzers/api_fuzzer.pb.h" namespace grpc_core { namespace { int force_experiments = []() { ForceEnableExperiment("event_engine_client", true); ForceEnableExperiment("event_engine_listener", true); return 1; }(); } // namespace namespace testing { static void free_non_null(void* p) { GPR_ASSERT(p != nullptr); gpr_free(p); } enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED }; class Call : public std::enable_shared_from_this { public: explicit Call(CallType type) : type_(type) { grpc_metadata_array_init(&recv_initial_metadata_); grpc_metadata_array_init(&recv_trailing_metadata_); grpc_call_details_init(&call_details_); } ~Call(); CallType type() const { return type_; } bool done() const { if ((type_ == CallType::TOMBSTONED || call_closed_) && pending_ops_ == 0) { return true; } if (call_ == nullptr && type() != CallType::PENDING_SERVER) return true; return false; } void Shutdown() { if (call_ != nullptr) { grpc_call_cancel(call_, nullptr); type_ = CallType::TOMBSTONED; } } void SetCall(grpc_call* call) { GPR_ASSERT(call_ == nullptr); call_ = call; } grpc_call* call() const { return call_; } void RequestCall(grpc_server* server, grpc_completion_queue* cq) { auto* v = FinishedRequestCall(); grpc_call_error error = grpc_server_request_call( server, &call_, &call_details_, &recv_initial_metadata_, cq, cq, v); if (error != GRPC_CALL_OK) { v->Run(false); } } void* Allocate(size_t size) { void* p = gpr_malloc(size); free_pointers_.push_back(p); return p; } template T* AllocArray(size_t elems) { return static_cast(Allocate(sizeof(T) * elems)); } template T* NewCopy(T value) { T* p = AllocArray(1); new (p) T(value); return p; } template grpc_slice ReadSlice(const T& s) { grpc_slice slice = grpc_slice_from_cpp_string(s.value()); unref_slices_.push_back(slice); return slice; } template grpc_metadata_array ReadMetadata(const M& metadata) { grpc_metadata* m = AllocArray(metadata.size()); for (int i = 0; i < metadata.size(); ++i) { m[i].key = ReadSlice(metadata[i].key()); m[i].value = ReadSlice(metadata[i].value()); } return grpc_metadata_array{static_cast(metadata.size()), static_cast(metadata.size()), m}; } absl::optional ReadOp( const api_fuzzer::BatchOp& batch_op, bool* batch_is_ok, uint8_t* batch_ops, std::vector>* unwinders) { grpc_op op; memset(&op, 0, sizeof(op)); switch (batch_op.op_case()) { case api_fuzzer::BatchOp::OP_NOT_SET: // invalid value return {}; case api_fuzzer::BatchOp::kSendInitialMetadata: if (sent_initial_metadata_) { *batch_is_ok = false; } else { sent_initial_metadata_ = true; op.op = GRPC_OP_SEND_INITIAL_METADATA; *batch_ops |= 1 << GRPC_OP_SEND_INITIAL_METADATA; auto ary = ReadMetadata(batch_op.send_initial_metadata().metadata()); op.data.send_initial_metadata.count = ary.count; op.data.send_initial_metadata.metadata = ary.metadata; } break; case api_fuzzer::BatchOp::kSendMessage: op.op = GRPC_OP_SEND_MESSAGE; if (send_message_ != nullptr) { *batch_is_ok = false; } else { *batch_ops |= 1 << GRPC_OP_SEND_MESSAGE; std::vector slices; for (const auto& m : batch_op.send_message().message()) { slices.push_back(ReadSlice(m)); } send_message_ = op.data.send_message.send_message = grpc_raw_byte_buffer_create(slices.data(), slices.size()); unwinders->push_back([this]() { grpc_byte_buffer_destroy(send_message_); send_message_ = nullptr; }); } break; case api_fuzzer::BatchOp::kSendCloseFromClient: op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; *batch_ops |= 1 << GRPC_OP_SEND_CLOSE_FROM_CLIENT; break; case api_fuzzer::BatchOp::kSendStatusFromServer: { op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; *batch_ops |= 1 << GRPC_OP_SEND_STATUS_FROM_SERVER; auto ary = ReadMetadata(batch_op.send_status_from_server().metadata()); op.data.send_status_from_server.trailing_metadata_count = ary.count; op.data.send_status_from_server.trailing_metadata = ary.metadata; op.data.send_status_from_server.status = static_cast( batch_op.send_status_from_server().status_code()); op.data.send_status_from_server.status_details = batch_op.send_status_from_server().has_status_details() ? NewCopy(ReadSlice( batch_op.send_status_from_server().status_details())) : nullptr; } break; case api_fuzzer::BatchOp::kReceiveInitialMetadata: if (enqueued_recv_initial_metadata_) { *batch_is_ok = false; } else { enqueued_recv_initial_metadata_ = true; op.op = GRPC_OP_RECV_INITIAL_METADATA; *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA; op.data.recv_initial_metadata.recv_initial_metadata = &recv_initial_metadata_; } break; case api_fuzzer::BatchOp::kReceiveMessage: // Allow only one active pending_recv_message_op to exist. Otherwise if // the previous enqueued recv_message_op is not complete by the time // we get here, then under certain conditions, enqueuing this op will // overwrite the internal call->receiving_buffer maintained by grpc // leading to a memory leak. if (call_closed_ || pending_recv_message_op_) { *batch_is_ok = false; } else { op.op = GRPC_OP_RECV_MESSAGE; *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE; pending_recv_message_op_ = true; op.data.recv_message.recv_message = &recv_message_; unwinders->push_back([this]() { pending_recv_message_op_ = false; }); } break; case api_fuzzer::BatchOp::kReceiveStatusOnClient: op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; op.data.recv_status_on_client.status = &status_; op.data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_; op.data.recv_status_on_client.status_details = &recv_status_details_; *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT; break; case api_fuzzer::BatchOp::kReceiveCloseOnServer: op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; *batch_ops |= 1 << GRPC_OP_RECV_CLOSE_ON_SERVER; op.data.recv_close_on_server.cancelled = &cancelled_; break; } op.reserved = nullptr; op.flags = batch_op.flags(); return op; } Validator* FinishedBatchValidator(uint8_t has_ops) { ++pending_ops_; auto self = shared_from_this(); return MakeValidator([self, has_ops](bool /*success*/) { --self->pending_ops_; if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) { self->pending_recv_message_op_ = false; if (self->recv_message_ != nullptr) { grpc_byte_buffer_destroy(self->recv_message_); self->recv_message_ = nullptr; } } if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) { grpc_byte_buffer_destroy(self->send_message_); self->send_message_ = nullptr; } if ((has_ops & (1u << GRPC_OP_RECV_STATUS_ON_CLIENT)) || (has_ops & (1u << GRPC_OP_RECV_CLOSE_ON_SERVER))) { self->call_closed_ = true; } }); } Validator* FinishedRequestCall() { ++pending_ops_; auto self = shared_from_this(); return MakeValidator([self](bool success) { GPR_ASSERT(self->pending_ops_ > 0); --self->pending_ops_; if (success) { GPR_ASSERT(self->call_ != nullptr); self->type_ = CallType::SERVER; } else { self->type_ = CallType::TOMBSTONED; } }); } private: CallType type_; grpc_call* call_ = nullptr; grpc_byte_buffer* recv_message_ = nullptr; grpc_status_code status_; grpc_metadata_array recv_initial_metadata_{0, 0, nullptr}; grpc_metadata_array recv_trailing_metadata_{0, 0, nullptr}; grpc_slice recv_status_details_ = grpc_empty_slice(); // set by receive close on server, unset here to trigger // msan if misused int cancelled_; int pending_ops_ = 0; bool sent_initial_metadata_ = false; bool enqueued_recv_initial_metadata_ = false; grpc_call_details call_details_{}; grpc_byte_buffer* send_message_ = nullptr; bool call_closed_ = false; bool pending_recv_message_op_ = false; std::vector free_pointers_; std::vector unref_slices_; }; Call::~Call() { if (call_ != nullptr) { grpc_call_unref(call_); } grpc_slice_unref(recv_status_details_); grpc_call_details_destroy(&call_details_); for (auto* p : free_pointers_) { gpr_free(p); } for (auto s : unref_slices_) { grpc_slice_unref(s); } if (recv_message_ != nullptr) { grpc_byte_buffer_destroy(recv_message_); recv_message_ = nullptr; } grpc_metadata_array_destroy(&recv_initial_metadata_); grpc_metadata_array_destroy(&recv_trailing_metadata_); } namespace { Validator* ValidateConnectivityWatch(gpr_timespec deadline, int* counter) { return MakeValidator([deadline, counter](bool success) { if (!success) { auto now = gpr_now(deadline.clock_type); GPR_ASSERT(gpr_time_cmp(now, deadline) >= 0); } --*counter; }); } } // namespace using ::grpc_event_engine::experimental::FuzzingEventEngine; using ::grpc_event_engine::experimental::GetDefaultEventEngine; using ::grpc_event_engine::experimental::SetEventEngineFactory; BasicFuzzer::BasicFuzzer(const fuzzing_event_engine::Actions& actions) : engine_([actions]() { SetEventEngineFactory( [actions]() -> std::unique_ptr< grpc_event_engine::experimental::EventEngine> { return std::make_unique( FuzzingEventEngine::Options(), actions); }); return std::dynamic_pointer_cast( GetDefaultEventEngine()); }()) { grpc_timer_manager_set_start_threaded(false); grpc_init(); { ExecCtx exec_ctx; Executor::SetThreadingAll(false); } resource_quota_ = MakeResourceQuota("fuzzer"); cq_ = grpc_completion_queue_create_for_next(nullptr); } BasicFuzzer::~BasicFuzzer() { GPR_ASSERT(ActiveCall() == nullptr); GPR_ASSERT(calls_.empty()); engine_->TickUntilIdle(); grpc_completion_queue_shutdown(cq_); GPR_ASSERT(PollCq() == Result::kComplete); grpc_completion_queue_destroy(cq_); grpc_shutdown_blocking(); engine_->UnsetGlobalHooks(); } void BasicFuzzer::Tick() { engine_->Tick(); grpc_timer_manager_tick(); } BasicFuzzer::Result BasicFuzzer::PollCq() { grpc_event ev = grpc_completion_queue_next( cq_, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); switch (ev.type) { case GRPC_OP_COMPLETE: { static_cast(ev.tag)->Run(ev.success); break; } case GRPC_QUEUE_TIMEOUT: break; case GRPC_QUEUE_SHUTDOWN: return Result::kComplete; } return Result::kPending; } BasicFuzzer::Result BasicFuzzer::CheckConnectivity(bool try_to_connect) { if (channel() != nullptr) { grpc_channel_check_connectivity_state(channel(), try_to_connect); } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::WatchConnectivity(uint32_t duration_us) { if (channel() != nullptr) { grpc_connectivity_state st = grpc_channel_check_connectivity_state(channel(), 0); if (st != GRPC_CHANNEL_SHUTDOWN) { gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(duration_us, GPR_TIMESPAN)); grpc_channel_watch_connectivity_state( channel(), st, deadline, cq_, ValidateConnectivityWatch(deadline, &pending_channel_watches_)); pending_channel_watches_++; } } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::CancelAllCallsIfShutdown() { if (server() != nullptr && server_shutdown_) { grpc_server_cancel_all_calls(server()); } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::ShutdownServer() { if (server() != nullptr) { grpc_server_shutdown_and_notify( server(), cq_, AssertSuccessAndDecrement(&pending_server_shutdowns_)); pending_server_shutdowns_++; server_shutdown_ = true; } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::DestroyServerIfReady() { if (server() != nullptr && server_shutdown_ && pending_server_shutdowns_ == 0) { DestroyServer(); } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::CreateCall( const api_fuzzer::CreateCall& create_call) { bool ok = true; if (channel() == nullptr) ok = false; // If the active call is a server call, then use it as the parent call // to exercise the propagation logic. Call* parent_call = ActiveCall(); if (parent_call != nullptr && parent_call->type() != CallType::SERVER) { parent_call = nullptr; } calls_.emplace_back(new Call(CallType::CLIENT)); grpc_slice method = calls_.back()->ReadSlice(create_call.method()); if (GRPC_SLICE_LENGTH(method) == 0) { ok = false; } grpc_slice host = calls_.back()->ReadSlice(create_call.host()); gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(create_call.timeout(), GPR_TIMESPAN)); if (ok) { calls_.back()->SetCall(grpc_channel_create_call( channel(), parent_call == nullptr ? nullptr : parent_call->call(), create_call.propagation_mask(), cq_, method, &host, deadline, nullptr)); } else { calls_.pop_back(); return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::ChangeActiveCall() { active_call_++; ActiveCall(); return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::QueueBatchForActiveCall( const api_fuzzer::Batch& queue_batch) { auto* active_call = ActiveCall(); if (active_call == nullptr || active_call->type() == CallType::PENDING_SERVER || active_call->call() == nullptr) { return Result::kFailed; } const auto& batch = queue_batch.operations(); if (batch.size() > 6) { return Result::kFailed; } std::vector ops; bool ok = true; uint8_t has_ops = 0; std::vector> unwinders; for (const auto& batch_op : batch) { auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders); if (!op.has_value()) continue; ops.push_back(*op); } if (channel() == nullptr) ok = false; if (ok) { auto* v = active_call->FinishedBatchValidator(has_ops); grpc_call_error error = grpc_call_start_batch( active_call->call(), ops.data(), ops.size(), v, nullptr); if (error != GRPC_CALL_OK) { v->Run(false); } } else { for (auto& unwind : unwinders) { unwind(); } return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::CancelActiveCall() { auto* active_call = ActiveCall(); if (active_call != nullptr && active_call->call() != nullptr) { grpc_call_cancel(active_call->call(), nullptr); } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::Pause(Duration duration) { ++paused_; engine()->RunAfterExactly(duration, [this]() { --paused_; }); return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::ServerRequestCall() { if (server() == nullptr) { return Result::kFailed; } calls_.emplace_back(new Call(CallType::PENDING_SERVER)); calls_.back()->RequestCall(server(), cq_); return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::DestroyActiveCall() { auto* active_call = ActiveCall(); if (active_call != nullptr && active_call->type() != CallType::PENDING_SERVER && active_call->call() != nullptr) { calls_[active_call_]->Shutdown(); } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::ValidatePeerForActiveCall() { auto* active_call = ActiveCall(); if (active_call != nullptr && active_call->call() != nullptr) { free_non_null(grpc_call_get_peer(active_call->call())); } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::ValidateChannelTarget() { if (channel() != nullptr) { free_non_null(grpc_channel_get_target(channel())); } else { return Result::kFailed; } return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::ResizeResourceQuota( uint32_t resize_resource_quota) { ExecCtx exec_ctx; resource_quota_->memory_quota()->SetSize(resize_resource_quota); return Result::kComplete; } BasicFuzzer::Result BasicFuzzer::CloseChannel() { if (channel() != nullptr) { DestroyChannel(); } else { return Result::kFailed; } return Result::kComplete; } Call* BasicFuzzer::ActiveCall() { while (!calls_.empty()) { if (active_call_ >= calls_.size()) { active_call_ = 0; } if (calls_[active_call_] != nullptr && !calls_[active_call_]->done()) { return calls_[active_call_].get(); } calls_.erase(calls_.begin() + active_call_); } return nullptr; } void BasicFuzzer::ShutdownCalls() { for (auto& call : calls_) { if (call == nullptr) continue; if (call->type() == CallType::PENDING_SERVER) continue; call->Shutdown(); } } bool BasicFuzzer::Continue() { return channel() != nullptr || server() != nullptr || pending_channel_watches_ > 0 || ActiveCall() != nullptr || paused_; } BasicFuzzer::Result BasicFuzzer::ExecuteAction( const api_fuzzer::Action& action) { gpr_log(GPR_DEBUG, "EXECUTE_ACTION: %s", action.DebugString().c_str()); switch (action.type_case()) { case api_fuzzer::Action::TYPE_NOT_SET: return BasicFuzzer::Result::kFailed; // tickle completion queue case api_fuzzer::Action::kPollCq: return PollCq(); // create an insecure channel case api_fuzzer::Action::kCreateChannel: return CreateChannel(action.create_channel()); // destroy a channel case api_fuzzer::Action::kCloseChannel: return CloseChannel(); // bring up a server case api_fuzzer::Action::kCreateServer: return CreateServer(action.create_server()); // begin server shutdown case api_fuzzer::Action::kShutdownServer: return ShutdownServer(); // cancel all calls if server is shutdown case api_fuzzer::Action::kCancelAllCallsIfShutdown: return CancelAllCallsIfShutdown(); // destroy server case api_fuzzer::Action::kDestroyServerIfReady: return DestroyServerIfReady(); // check connectivity case api_fuzzer::Action::kCheckConnectivity: return CheckConnectivity(action.check_connectivity()); // watch connectivity case api_fuzzer::Action::kWatchConnectivity: return WatchConnectivity(action.watch_connectivity()); // create a call case api_fuzzer::Action::kCreateCall: return CreateCall(action.create_call()); // switch the 'current' call case api_fuzzer::Action::kChangeActiveCall: return ChangeActiveCall(); // queue some ops on a call case api_fuzzer::Action::kQueueBatch: return QueueBatchForActiveCall(action.queue_batch()); // cancel current call case api_fuzzer::Action::kCancelCall: return CancelActiveCall(); // get a calls peer case api_fuzzer::Action::kGetPeer: return ValidatePeerForActiveCall(); // get a channels target case api_fuzzer::Action::kGetTarget: return ValidateChannelTarget(); // send a ping on a channel case api_fuzzer::Action::kPing: // Ping is no longer a part of the API return BasicFuzzer::Result::kNotSupported; // enable a tracer case api_fuzzer::Action::kEnableTracer: { grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1); break; } // disable a tracer case api_fuzzer::Action::kDisableTracer: { grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0); break; } // request a server call case api_fuzzer::Action::kRequestCall: return ServerRequestCall(); // destroy a call case api_fuzzer::Action::kDestroyCall: return DestroyActiveCall(); // resize the buffer pool case api_fuzzer::Action::kResizeResourceQuota: return ResizeResourceQuota(action.resize_resource_quota()); case api_fuzzer::Action::kSleepMs: return Pause(std::min(Duration::Milliseconds(action.sleep_ms()), Duration::Minutes(1))); default: Crash(absl::StrCat("Unsupported Fuzzing Action of type: ", action.type_case())); } return BasicFuzzer::Result::kComplete; } void BasicFuzzer::TryShutdown() { engine()->FuzzingDone(); if (channel() != nullptr) { DestroyChannel(); } if (server() != nullptr) { if (!server_shutdown_called()) { ShutdownServer(); } if (server_finished_shutting_down()) { DestroyServer(); } } ShutdownCalls(); grpc_timer_manager_tick(); GPR_ASSERT(PollCq() == Result::kPending); } void BasicFuzzer::Run(absl::Span actions) { size_t action_index = 0; auto allow_forced_shutdown = std::make_shared(false); auto no_more_actions = [&]() { action_index = actions.size(); }; engine()->RunAfterExactly(minimum_run_time_, [allow_forced_shutdown] { *allow_forced_shutdown = true; }); while (action_index < actions.size() || Continue()) { Tick(); if (paused_) continue; if (action_index == actions.size()) { if (*allow_forced_shutdown) TryShutdown(); continue; } auto result = ExecuteAction(*actions[action_index++]); if (result == Result::kFailed) { no_more_actions(); } } } } // namespace testing } // namespace grpc_core