/* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "src/cloud_trace_processor/orchestrator_impl.h" #include #include #include #include #include "perfetto/base/status.h" #include "perfetto/ext/base/flat_hash_map.h" #include "perfetto/ext/base/status_or.h" #include "perfetto/ext/base/threading/future.h" #include "perfetto/ext/base/threading/stream.h" #include "perfetto/ext/cloud_trace_processor/worker.h" #include "protos/perfetto/cloud_trace_processor/common.pb.h" #include "protos/perfetto/cloud_trace_processor/orchestrator.pb.h" #include "protos/perfetto/cloud_trace_processor/worker.pb.h" #include "src/trace_processor/util/status_macros.h" namespace perfetto { namespace cloud_trace_processor { namespace { base::Future CreateResponseToStatus( base::StatusOr response_or) { return response_or.status(); } base::Future SetTracesResponseToStatus( base::StatusOr response_or) { return response_or.status(); } base::Future> RpcResponseToPoolResponse( base::StatusOr resp) { RETURN_IF_ERROR(resp.status()); protos::TracePoolQueryResponse ret; ret.set_trace(std::move(resp->trace())); *ret.mutable_result() = std::move(*resp->mutable_result()); return ret; } base::StatusOrStream RoundRobinSetTraces(const std::vector>& workers, const std::vector& traces) { uint32_t worker_idx = 0; std::vector protos; protos.resize(workers.size()); for (const auto& trace : traces) { protos[worker_idx].add_traces(trace); worker_idx = (worker_idx + 1) % workers.size(); } using ShardResponse = protos::TracePoolShardSetTracesResponse; std::vector> streams; for (uint32_t i = 0; i < protos.size(); ++i) { streams.emplace_back(workers[i]->TracePoolShardSetTraces(protos[i])); } return base::FlattenStreams(std::move(streams)); } } // namespace Orchestrator::~Orchestrator() = default; std::unique_ptr Orchestrator::CreateInProcess( std::vector> workers) { return std::unique_ptr( new OrchestratorImpl(std::move(workers))); } OrchestratorImpl::OrchestratorImpl(std::vector> workers) : workers_(std::move(workers)) {} base::StatusOrFuture OrchestratorImpl::TracePoolCreate(const protos::TracePoolCreateArgs& args) { if (args.pool_type() != protos::TracePoolType::SHARED) { return base::StatusOr( base::ErrStatus("Currently only SHARED pools are supported")); } if (!args.has_shared_pool_name()) { return base::StatusOr( base::ErrStatus("Pool name must be provided for SHARED pools")); } std::string id = "shared:" + args.shared_pool_name(); TracePool* exist = pools_.Find(id); if (exist) { return base::StatusOr( base::ErrStatus("Pool %s already exists", id.c_str())); } protos::TracePoolShardCreateArgs group_args; group_args.set_pool_id(id); group_args.set_pool_type(args.pool_type()); using ShardResponse = protos::TracePoolShardCreateResponse; std::vector> shards; for (uint32_t i = 0; i < workers_.size(); ++i) { shards.emplace_back( base::StreamFromFuture(workers_[i]->TracePoolShardCreate(group_args))); } return base::FlattenStreams(std::move(shards)) .MapFuture(&CreateResponseToStatus) .Collect(base::AllOkCollector()) .ContinueWith( [this, id](base::StatusOr resp) -> base::StatusOrFuture { RETURN_IF_ERROR(resp.status()); auto it_and_inserted = pools_.Insert(id, TracePool()); if (!it_and_inserted.second) { return base::ErrStatus("Unable to insert pool %s", id.c_str()); } return protos::TracePoolCreateResponse(); }); } base::StatusOrFuture OrchestratorImpl::TracePoolSetTraces( const protos::TracePoolSetTracesArgs& args) { std::string id = args.pool_id(); TracePool* pool = pools_.Find(id); if (!pool) { return base::StatusOr( base::ErrStatus("Unable to find pool %s", id.c_str())); } if (!pool->loaded_traces.empty()) { return base::StatusOr(base::ErrStatus( "Incrementally adding/removing items to pool not currently supported")); } pool->loaded_traces.assign(args.traces().begin(), args.traces().end()); return RoundRobinSetTraces(workers_, pool->loaded_traces) .MapFuture(&SetTracesResponseToStatus) .Collect(base::AllOkCollector()) .ContinueWith( [](base::Status status) -> base::StatusOrFuture { RETURN_IF_ERROR(status); return protos::TracePoolSetTracesResponse(); }); } base::StatusOrStream OrchestratorImpl::TracePoolQuery(const protos::TracePoolQueryArgs& args) { TracePool* pool = pools_.Find(args.pool_id()); if (!pool) { return base::StreamOf(base::StatusOr( base::ErrStatus("Unable to find pool %s", args.pool_id().c_str()))); } protos::TracePoolShardQueryArgs shard_args; *shard_args.mutable_pool_id() = args.pool_id(); *shard_args.mutable_sql_query() = args.sql_query(); using ShardResponse = protos::TracePoolShardQueryResponse; std::vector> streams; for (uint32_t i = 0; i < workers_.size(); ++i) { streams.emplace_back(workers_[i]->TracePoolShardQuery(shard_args)); } return base::FlattenStreams(std::move(streams)) .MapFuture(&RpcResponseToPoolResponse); } base::StatusOrFuture OrchestratorImpl::TracePoolDestroy(const protos::TracePoolDestroyArgs& args) { std::string id = args.pool_id(); TracePool* pool = pools_.Find(id); if (!pool) { return base::StatusOr( base::ErrStatus("Unable to find pool %s", id.c_str())); } protos::TracePoolShardDestroyArgs shard_args; *shard_args.mutable_pool_id() = id; using ShardResponse = protos::TracePoolShardDestroyResponse; std::vector> streams; for (uint32_t i = 0; i < workers_.size(); ++i) { streams.emplace_back( base::StreamFromFuture(workers_[i]->TracePoolShardDestroy(shard_args))); } return base::FlattenStreams(std::move(streams)) .MapFuture( [](base::StatusOr resp) -> base::Future { return resp.status(); }) .Collect(base::AllOkCollector()) .ContinueWith( [this, id](base::Status status) -> base::StatusOrFuture { RETURN_IF_ERROR(status); PERFETTO_CHECK(pools_.Erase(id)); return protos::TracePoolDestroyResponse(); }); } } // namespace cloud_trace_processor } // namespace perfetto