1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/perfetto_cmd/packet_writer.h"
18
19 #include <array>
20
21 #include <fcntl.h>
22 #include <getopt.h>
23 #include <signal.h>
24 #include <stdio.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <zlib.h>
28
29 #include "perfetto/base/paged_memory.h"
30 #include "perfetto/protozero/proto_utils.h"
31 #include "perfetto/tracing/core/trace_packet.h"
32
33 namespace perfetto {
34 namespace {
35
36 using protozero::proto_utils::kMessageLengthFieldSize;
37 using protozero::proto_utils::MakeTagLengthDelimited;
38 using protozero::proto_utils::WriteRedundantVarInt;
39 using protozero::proto_utils::WriteVarInt;
40 using Preamble = std::array<char, 16>;
41
42 // ID of the |packet| field in trace.proto. Hardcoded as this we don't
43 // want to depend on protos/trace:lite for binary size saving reasons.
44 constexpr uint32_t kPacketId = 1;
45 // ID of |compressed_packets| in trace_packet.proto.
46 constexpr uint32_t kCompressedPacketsId = 50;
47
48 // Maximum allowable size for a single packet.
49 const size_t kMaxPacketSize = 500 * 1024;
50 // After every kPendingBytesLimit we do a Z_SYNC_FLUSH in the zlib stream.
51 const size_t kPendingBytesLimit = 32 * 1024;
52
53 template <uint32_t id>
GetPreamble(size_t sz,Preamble * preamble)54 size_t GetPreamble(size_t sz, Preamble* preamble) {
55 uint8_t* ptr = reinterpret_cast<uint8_t*>(preamble->data());
56 constexpr uint32_t tag = MakeTagLengthDelimited(id);
57 ptr = WriteVarInt(tag, ptr);
58 ptr = WriteVarInt(sz, ptr);
59 size_t preamble_size = reinterpret_cast<uintptr_t>(ptr) -
60 reinterpret_cast<uintptr_t>(preamble->data());
61 PERFETTO_DCHECK(preamble_size < preamble->size());
62 return preamble_size;
63 }
64
65 class FilePacketWriter : public PacketWriter {
66 public:
67 FilePacketWriter(FILE* fd);
68 ~FilePacketWriter() override;
69 bool WritePackets(const std::vector<TracePacket>& packets) override;
70
71 private:
72 FILE* fd_;
73 };
74
75 class ZipPacketWriter : public PacketWriter {
76 public:
77 ZipPacketWriter(std::unique_ptr<PacketWriter>);
78 ~ZipPacketWriter() override;
79 bool WritePackets(const std::vector<TracePacket>& packets) override;
80
81 private:
82 bool WritePacket(const TracePacket& packet);
83 void CheckEq(int actual_code, int expected_code);
84 bool FinalizeCompressedPacket();
Deflate(const char * ptr,size_t size)85 inline void Deflate(const char* ptr, size_t size) {
86 return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
87 }
Deflate(const void * ptr,size_t size)88 inline void Deflate(const void* ptr, size_t size) {
89 return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
90 }
91 void Deflate(const uint8_t* ptr, size_t size);
92
93 std::unique_ptr<PacketWriter> writer_;
94 z_stream stream_{};
95
96 base::PagedMemory buf_;
97 uint8_t* const start_;
98 uint8_t* const end_;
99
100 bool is_compressing_ = false;
101 size_t pending_bytes_ = 0;
102 };
103
FilePacketWriter(FILE * fd)104 FilePacketWriter::FilePacketWriter(FILE* fd) : fd_(fd) {}
105
~FilePacketWriter()106 FilePacketWriter::~FilePacketWriter() {
107 fflush(fd_);
108 }
109
WritePackets(const std::vector<TracePacket> & packets)110 bool FilePacketWriter::WritePackets(const std::vector<TracePacket>& packets) {
111 for (const TracePacket& packet : packets) {
112 Preamble preamble;
113 size_t size = GetPreamble<kPacketId>(packet.size(), &preamble);
114 if (fwrite(preamble.data(), 1, size, fd_) != size)
115 return false;
116 for (const Slice& slice : packet.slices()) {
117 if (fwrite(reinterpret_cast<const char*>(slice.start), 1, slice.size,
118 fd_) != slice.size) {
119 return false;
120 }
121 }
122 }
123
124 return true;
125 }
126
ZipPacketWriter(std::unique_ptr<PacketWriter> writer)127 ZipPacketWriter::ZipPacketWriter(std::unique_ptr<PacketWriter> writer)
128 : writer_(std::move(writer)),
129 buf_(base::PagedMemory::Allocate(kMaxPacketSize)),
130 start_(static_cast<uint8_t*>(buf_.Get())),
131 end_(start_ + buf_.size()) {}
132
~ZipPacketWriter()133 ZipPacketWriter::~ZipPacketWriter() {
134 if (is_compressing_)
135 FinalizeCompressedPacket();
136 }
137
WritePackets(const std::vector<TracePacket> & packets)138 bool ZipPacketWriter::WritePackets(const std::vector<TracePacket>& packets) {
139 for (const TracePacket& packet : packets) {
140 if (!WritePacket(packet))
141 return false;
142 }
143 return true;
144 }
145
WritePacket(const TracePacket & packet)146 bool ZipPacketWriter::WritePacket(const TracePacket& packet) {
147 // If we have already written one compressed packet, check whether we should
148 // flush the buffer.
149 if (is_compressing_) {
150 // We have two goals:
151 // - Fit as much data as possible into each packet
152 // - Ensure each packet is under 512KB
153 // We keep track of two numbers:
154 // - the number of remaining bytes in the output buffer
155 // - the number of (pending) uncompressed bytes written since the last flush
156 // The pending bytes may or may not have appeared in output buffer.
157 // Assuming in the worst case each uncompressed input byte can turn into
158 // two compressed bytes we can ensure we don't go over 512KB by not letting
159 // the number of pending bytes go over remaining bytes/2 - however often
160 // each input byte will not turn into 2 output bytes but less than 1 output
161 // byte - so this underfills the packet. To avoid this every 32kb we deflate
162 // with Z_SYNC_FLUSH ensuring all pending bytes are present in the output
163 // buffer.
164 if (pending_bytes_ > kPendingBytesLimit) {
165 CheckEq(deflate(&stream_, Z_SYNC_FLUSH), Z_OK);
166 pending_bytes_ = 0;
167 }
168
169 PERFETTO_DCHECK(end_ >= stream_.next_out);
170 size_t remaining = static_cast<size_t>(end_ - stream_.next_out);
171 if ((pending_bytes_ + packet.size() + 1024) * 2 > remaining) {
172 if (!FinalizeCompressedPacket()) {
173 return false;
174 }
175 }
176 }
177
178 // Reinitialize the compresser if needed:
179 if (!is_compressing_) {
180 memset(&stream_, 0, sizeof(stream_));
181 CheckEq(deflateInit(&stream_, 9), Z_OK);
182 is_compressing_ = true;
183 stream_.next_out = start_;
184 stream_.avail_out = static_cast<unsigned int>(end_ - start_);
185 }
186
187 // Compress the trace packet header:
188 Preamble packet_hdr;
189 size_t packet_hdr_size = GetPreamble<kPacketId>(packet.size(), &packet_hdr);
190 Deflate(packet_hdr.data(), packet_hdr_size);
191
192 // Compress the trace packet itself:
193 for (const Slice& slice : packet.slices()) {
194 Deflate(slice.start, slice.size);
195 }
196
197 return true;
198 }
199
FinalizeCompressedPacket()200 bool ZipPacketWriter::FinalizeCompressedPacket() {
201 PERFETTO_DCHECK(is_compressing_);
202
203 CheckEq(deflate(&stream_, Z_FINISH), Z_STREAM_END);
204
205 size_t size = static_cast<size_t>(stream_.next_out - start_);
206 Preamble preamble;
207 size_t preamble_size = GetPreamble<kCompressedPacketsId>(size, &preamble);
208
209 std::vector<TracePacket> out_packets(1);
210 TracePacket& out_packet = out_packets[0];
211 out_packet.AddSlice(preamble.data(), preamble_size);
212 out_packet.AddSlice(start_, size);
213
214 if (!writer_->WritePackets(out_packets))
215 return false;
216
217 is_compressing_ = false;
218 pending_bytes_ = 0;
219 CheckEq(deflateEnd(&stream_), Z_OK);
220 return true;
221 }
222
CheckEq(int actual_code,int expected_code)223 void ZipPacketWriter::CheckEq(int actual_code, int expected_code) {
224 if (actual_code == expected_code)
225 return;
226 PERFETTO_FATAL("Expected %d got %d: %s", actual_code, expected_code,
227 stream_.msg);
228 }
229
Deflate(const uint8_t * ptr,size_t size)230 void ZipPacketWriter::Deflate(const uint8_t* ptr, size_t size) {
231 PERFETTO_CHECK(is_compressing_);
232 stream_.next_in = ptr;
233 stream_.avail_in = static_cast<unsigned int>(size);
234 CheckEq(deflate(&stream_, Z_NO_FLUSH), Z_OK);
235 PERFETTO_CHECK(stream_.avail_in == 0);
236 pending_bytes_ += size;
237 }
238
239 } // namespace
240
PacketWriter()241 PacketWriter::PacketWriter() {}
242
~PacketWriter()243 PacketWriter::~PacketWriter() {}
244
CreateFilePacketWriter(FILE * fd)245 std::unique_ptr<PacketWriter> CreateFilePacketWriter(FILE* fd) {
246 return std::unique_ptr<PacketWriter>(new FilePacketWriter(fd));
247 }
248
CreateZipPacketWriter(std::unique_ptr<PacketWriter> writer)249 std::unique_ptr<PacketWriter> CreateZipPacketWriter(
250 std::unique_ptr<PacketWriter> writer) {
251 return std::unique_ptr<PacketWriter>(new ZipPacketWriter(std::move(writer)));
252 }
253
254 } // namespace perfetto
255