1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/trace_writer_impl.h"
18
19 #include <string.h>
20
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24
25 #include "perfetto/base/logging.h"
26 #include "perfetto/protozero/proto_utils.h"
27 #include "src/tracing/core/shared_memory_arbiter_impl.h"
28
29 #include "perfetto/trace/trace_packet.pbzero.h"
30
31 using protozero::proto_utils::kMessageLengthFieldSize;
32 using protozero::proto_utils::WriteRedundantVarInt;
33 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
34
35 namespace perfetto {
36
37 namespace {
38 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
39 } // namespace
40
TraceWriterImpl(SharedMemoryArbiterImpl * shmem_arbiter,WriterID id,BufferID target_buffer)41 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
42 WriterID id,
43 BufferID target_buffer)
44 : shmem_arbiter_(shmem_arbiter),
45 id_(id),
46 target_buffer_(target_buffer),
47 protobuf_stream_writer_(this) {
48 // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
49 // more gracefully and always return a no-op TracePacket in NewTracePacket().
50 PERFETTO_CHECK(id_ != 0);
51
52 cur_packet_.reset(new protos::pbzero::TracePacket());
53 cur_packet_->Finalize(); // To avoid the DCHECK in NewTracePacket().
54 }
55
~TraceWriterImpl()56 TraceWriterImpl::~TraceWriterImpl() {
57 if (cur_chunk_.is_valid()) {
58 cur_packet_->Finalize();
59 Flush();
60 }
61 shmem_arbiter_->ReleaseWriterID(id_);
62 }
63
Flush(std::function<void ()> callback)64 void TraceWriterImpl::Flush(std::function<void()> callback) {
65 // Flush() cannot be called in the middle of a TracePacket.
66 PERFETTO_CHECK(cur_packet_->is_finalized());
67
68 if (cur_chunk_.is_valid()) {
69 shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
70 &patch_list_);
71 shmem_arbiter_->FlushPendingCommitDataRequests(callback);
72 } else {
73 PERFETTO_DCHECK(patch_list_.empty());
74 }
75 protobuf_stream_writer_.Reset({nullptr, nullptr});
76 }
77
NewTracePacket()78 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
79 // If we hit this, the caller is calling NewTracePacket() without having
80 // finalized the previous packet.
81 PERFETTO_DCHECK(cur_packet_->is_finalized());
82
83 fragmenting_packet_ = false;
84
85 // Reserve space for the size of the message. Note: this call might re-enter
86 // into this class invoking GetNewBuffer() if there isn't enough space or if
87 // this is the very first call to NewTracePacket().
88 static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
89 "The packet header must match the Message header size");
90
91 // It doesn't make sense to begin a packet that is going to fragment
92 // immediately after (8 is just an arbitrary estimation on the minimum size of
93 // a realistic packet).
94 if (protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8)
95 protobuf_stream_writer_.Reset(GetNewBuffer());
96
97 cur_packet_->Reset(&protobuf_stream_writer_);
98 uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
99 memset(header, 0, kPacketHeaderSize);
100 cur_packet_->set_size_field(header);
101 cur_chunk_.IncrementPacketCount();
102 TracePacketHandle handle(cur_packet_.get());
103 cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
104 fragmenting_packet_ = true;
105 return handle;
106 }
107
108 // Called by the Message. We can get here in two cases:
109 // 1. In the middle of writing a Message,
110 // when |fragmenting_packet_| == true. In this case we want to update the
111 // chunk header with a partial packet and start a new partial packet in the
112 // new chunk.
113 // 2. While calling ReserveBytes() for the packet header in NewTracePacket().
114 // In this case |fragmenting_packet_| == false and we just want a new chunk
115 // without creating any fragments.
GetNewBuffer()116 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
117 if (fragmenting_packet_) {
118 uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
119 PERFETTO_DCHECK(wptr >= cur_fragment_start_);
120 uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
121 PERFETTO_DCHECK(partial_size < cur_chunk_.size());
122
123 // Backfill the packet header with the fragment size.
124 PERFETTO_DCHECK(partial_size > 0);
125 cur_packet_->inc_size_already_written(partial_size);
126 cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
127 WriteRedundantVarInt(partial_size, cur_packet_->size_field());
128
129 // Descend in the stack of non-finalized nested submessages (if any) and
130 // detour their |size_field| into the |patch_list_|. At this point we have
131 // to release the chunk and they cannot write anymore into that.
132 // TODO(primiano): add tests to cover this logic.
133 for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
134 nested_msg = nested_msg->nested_message()) {
135 uint8_t* const cur_hdr = nested_msg->size_field();
136
137 // If this is false the protozero Message has already been instructed to
138 // write, upon Finalize(), its size into the patch list.
139 bool size_field_points_within_chunk =
140 cur_hdr >= cur_chunk_.payload_begin() &&
141 cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
142
143 if (size_field_points_within_chunk) {
144 auto offset =
145 static_cast<uint16_t>(cur_hdr - cur_chunk_.payload_begin());
146 const ChunkID cur_chunk_id =
147 cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
148 Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
149 nested_msg->set_size_field(&patch->size_field[0]);
150 } else {
151 #if PERFETTO_DCHECK_IS_ON()
152 // Ensure that the size field of the message points to an element of the
153 // patch list.
154 auto patch_it = std::find_if(
155 patch_list_.begin(), patch_list_.end(),
156 [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
157 PERFETTO_DCHECK(patch_it != patch_list_.end());
158 #endif
159 }
160 } // for(nested_msg
161 } // if(fragmenting_packet)
162
163 if (cur_chunk_.is_valid()) {
164 // ReturnCompletedChunk will consume the first patched entries from
165 // |patch_list_| and shrink it.
166 shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
167 &patch_list_);
168 }
169
170 // Start a new chunk.
171
172 ChunkHeader::Packets packets = {};
173 if (fragmenting_packet_) {
174 packets.count = 1;
175 packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
176 }
177
178 // The memory order of the stores below doesn't really matter. This |header|
179 // is just a local temporary object. The GetNewChunk() call below will copy it
180 // into the shared buffer with the proper barriers.
181 ChunkHeader header = {};
182 header.writer_id.store(id_, std::memory_order_relaxed);
183 header.chunk_id.store(next_chunk_id_++, std::memory_order_relaxed);
184 header.packets.store(packets, std::memory_order_relaxed);
185
186 cur_chunk_ = shmem_arbiter_->GetNewChunk(header);
187 uint8_t* payload_begin = cur_chunk_.payload_begin();
188 if (fragmenting_packet_) {
189 cur_packet_->set_size_field(payload_begin);
190 memset(payload_begin, 0, kPacketHeaderSize);
191 payload_begin += kPacketHeaderSize;
192 cur_fragment_start_ = payload_begin;
193 }
194
195 return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
196 }
197
writer_id() const198 WriterID TraceWriterImpl::writer_id() const {
199 return id_;
200 }
201
202 // Base class ctor/dtor definition.
203 TraceWriter::TraceWriter() = default;
204 TraceWriter::~TraceWriter() = default;
205
206 } // namespace perfetto
207