1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/perfetto_cmd/packet_writer.h"
18
19 #include <array>
20
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <stdio.h>
24 #include <sys/stat.h>
25
26 #include "perfetto/base/build_config.h"
27 #include "perfetto/ext/base/getopt.h"
28 #include "perfetto/ext/base/paged_memory.h"
29 #include "perfetto/ext/base/utils.h"
30 #include "perfetto/ext/tracing/core/trace_packet.h"
31 #include "perfetto/protozero/proto_utils.h"
32
33 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
34 #include <zlib.h>
35 #endif
36
37 namespace perfetto {
38 namespace {
39
40 using protozero::proto_utils::MakeTagLengthDelimited;
41 using protozero::proto_utils::WriteVarInt;
42 using Preamble = std::array<char, 16>;
43
44 // ID of the |packet| field in trace.proto. Hardcoded as this we don't
45 // want to depend on protos/trace:lite for binary size saving reasons.
46 constexpr uint32_t kPacketId = 1;
47
48 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
49
50 // ID of |compressed_packets| in trace_packet.proto.
51 constexpr uint32_t kCompressedPacketsId = 50;
52
53 // Some transport mechanisms have a 512kb limit on packet size.
54 // ZipPacketWriter respects this limit where possible and does
55 // not produce compressed packets larger than 512kb. This is
56 // constant is deliberately conservative to leave plenty of
57 // room for the transport to add additional headers etc.
58 const size_t kMaxPacketSize = 500 * 1024;
59
60 // After every kPendingBytesLimit we do a Z_SYNC_FLUSH in the zlib stream.
61 const size_t kPendingBytesLimit = 32 * 1024;
62
63 #endif // PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
64
65 template <uint32_t id>
GetPreamble(size_t sz,Preamble * preamble)66 size_t GetPreamble(size_t sz, Preamble* preamble) {
67 uint8_t* ptr = reinterpret_cast<uint8_t*>(preamble->data());
68 constexpr uint32_t tag = MakeTagLengthDelimited(id);
69 ptr = WriteVarInt(tag, ptr);
70 ptr = WriteVarInt(sz, ptr);
71 size_t preamble_size = reinterpret_cast<uintptr_t>(ptr) -
72 reinterpret_cast<uintptr_t>(preamble->data());
73 PERFETTO_DCHECK(preamble_size < preamble->size());
74 return preamble_size;
75 }
76
77 class FilePacketWriter : public PacketWriter {
78 public:
79 FilePacketWriter(FILE* fd);
80 ~FilePacketWriter() override;
81 bool WritePacket(const TracePacket& packet) override;
82
83 private:
84 FILE* fd_;
85 };
86
FilePacketWriter(FILE * fd)87 FilePacketWriter::FilePacketWriter(FILE* fd) : fd_(fd) {}
88
~FilePacketWriter()89 FilePacketWriter::~FilePacketWriter() {
90 fflush(fd_);
91 }
92
WritePacket(const TracePacket & packet)93 bool FilePacketWriter::WritePacket(const TracePacket& packet) {
94 Preamble preamble;
95 size_t size = GetPreamble<kPacketId>(packet.size(), &preamble);
96 if (fwrite(preamble.data(), 1, size, fd_) != size)
97 return false;
98 for (const Slice& slice : packet.slices()) {
99 if (fwrite(reinterpret_cast<const char*>(slice.start), 1, slice.size,
100 fd_) != slice.size) {
101 return false;
102 }
103 }
104
105 return true;
106 }
107
108 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
109
110 class ZipPacketWriter : public PacketWriter {
111 public:
112 ZipPacketWriter(std::unique_ptr<PacketWriter>);
113 ~ZipPacketWriter() override;
114 bool WritePacket(const TracePacket& packet) override;
115
116 private:
117 void CheckEq(int actual_code, int expected_code);
118 bool FinalizeCompressedPacket();
Deflate(const char * ptr,size_t size)119 inline void Deflate(const char* ptr, size_t size) {
120 return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
121 }
Deflate(const void * ptr,size_t size)122 inline void Deflate(const void* ptr, size_t size) {
123 return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
124 }
125 void Deflate(const uint8_t* ptr, size_t size);
126
127 std::unique_ptr<PacketWriter> writer_;
128 z_stream stream_{};
129
130 base::PagedMemory buf_;
131 uint8_t* const start_;
132 uint8_t* const end_;
133
134 bool is_compressing_ = false;
135 size_t pending_bytes_ = 0;
136 };
137
ZipPacketWriter(std::unique_ptr<PacketWriter> writer)138 ZipPacketWriter::ZipPacketWriter(std::unique_ptr<PacketWriter> writer)
139 : writer_(std::move(writer)),
140 buf_(base::PagedMemory::Allocate(kMaxPacketSize)),
141 start_(static_cast<uint8_t*>(buf_.Get())),
142 end_(start_ + buf_.size()) {}
143
~ZipPacketWriter()144 ZipPacketWriter::~ZipPacketWriter() {
145 if (is_compressing_)
146 FinalizeCompressedPacket();
147 }
148
WritePacket(const TracePacket & packet)149 bool ZipPacketWriter::WritePacket(const TracePacket& packet) {
150 // If we have already written one compressed packet, check whether we should
151 // flush the buffer.
152 if (is_compressing_) {
153 // We have two goals:
154 // - Fit as much data as possible into each packet
155 // - Ensure each packet is under 512KB
156 // We keep track of two numbers:
157 // - the number of remaining bytes in the output buffer
158 // - the number of (pending) uncompressed bytes written since the last flush
159 // The pending bytes may or may not have appeared in output buffer.
160 // Assuming in the worst case each uncompressed input byte can turn into
161 // two compressed bytes we can ensure we don't go over 512KB by not letting
162 // the number of pending bytes go over remaining bytes/2 - however often
163 // each input byte will not turn into 2 output bytes but less than 1 output
164 // byte - so this underfills the packet. To avoid this every 32kb we deflate
165 // with Z_SYNC_FLUSH ensuring all pending bytes are present in the output
166 // buffer.
167 if (pending_bytes_ > kPendingBytesLimit) {
168 CheckEq(deflate(&stream_, Z_SYNC_FLUSH), Z_OK);
169 pending_bytes_ = 0;
170 }
171
172 PERFETTO_DCHECK(end_ >= stream_.next_out);
173 size_t remaining = static_cast<size_t>(end_ - stream_.next_out);
174 if ((pending_bytes_ + packet.size() + 1024) * 2 > remaining) {
175 if (!FinalizeCompressedPacket()) {
176 return false;
177 }
178 }
179 }
180
181 // Do not attempt to compress large packets (since they may overflow our
182 // output buffer) instead insert them directly into the output stream:
183 if (packet.size() > kMaxPacketSize) {
184 // We can't be compressing here, if we were we would have attempted
185 // to FinalizeCompressedPacket() above:
186 PERFETTO_DCHECK(!is_compressing_);
187 return writer_->WritePacket(packet);
188 }
189
190 // Reinitialize the compresser if needed:
191 if (!is_compressing_) {
192 memset(&stream_, 0, sizeof(stream_));
193 CheckEq(deflateInit(&stream_, 6), Z_OK);
194 is_compressing_ = true;
195 stream_.next_out = start_;
196 stream_.avail_out = static_cast<unsigned int>(end_ - start_);
197 }
198
199 // Compress the trace packet header:
200 Preamble packet_hdr;
201 size_t packet_hdr_size = GetPreamble<kPacketId>(packet.size(), &packet_hdr);
202 Deflate(packet_hdr.data(), packet_hdr_size);
203
204 // Compress the trace packet itself:
205 for (const Slice& slice : packet.slices()) {
206 Deflate(slice.start, slice.size);
207 }
208
209 return true;
210 }
211
FinalizeCompressedPacket()212 bool ZipPacketWriter::FinalizeCompressedPacket() {
213 PERFETTO_DCHECK(is_compressing_);
214
215 CheckEq(deflate(&stream_, Z_FINISH), Z_STREAM_END);
216
217 size_t size = static_cast<size_t>(stream_.next_out - start_);
218 Preamble preamble;
219 size_t preamble_size = GetPreamble<kCompressedPacketsId>(size, &preamble);
220
221 std::vector<TracePacket> out_packets(1);
222 TracePacket& out_packet = out_packets[0];
223 out_packet.AddSlice(preamble.data(), preamble_size);
224 out_packet.AddSlice(start_, size);
225
226 if (!writer_->WritePackets(out_packets))
227 return false;
228
229 is_compressing_ = false;
230 pending_bytes_ = 0;
231 CheckEq(deflateEnd(&stream_), Z_OK);
232 return true;
233 }
234
CheckEq(int actual_code,int expected_code)235 void ZipPacketWriter::CheckEq(int actual_code, int expected_code) {
236 if (actual_code == expected_code)
237 return;
238 PERFETTO_FATAL("Expected %d got %d: %s", actual_code, expected_code,
239 stream_.msg);
240 }
241
Deflate(const uint8_t * ptr,size_t size)242 void ZipPacketWriter::Deflate(const uint8_t* ptr, size_t size) {
243 PERFETTO_CHECK(is_compressing_);
244 stream_.next_in = const_cast<uint8_t*>(ptr);
245 stream_.avail_in = static_cast<unsigned int>(size);
246 CheckEq(deflate(&stream_, Z_NO_FLUSH), Z_OK);
247 PERFETTO_CHECK(stream_.avail_in == 0);
248 pending_bytes_ += size;
249 }
250
251 #endif // PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
252
253 } // namespace
254
PacketWriter()255 PacketWriter::PacketWriter() {}
256
~PacketWriter()257 PacketWriter::~PacketWriter() {}
258
CreateFilePacketWriter(FILE * fd)259 std::unique_ptr<PacketWriter> CreateFilePacketWriter(FILE* fd) {
260 return std::unique_ptr<PacketWriter>(new FilePacketWriter(fd));
261 }
262
263 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
CreateZipPacketWriter(std::unique_ptr<PacketWriter> writer)264 std::unique_ptr<PacketWriter> CreateZipPacketWriter(
265 std::unique_ptr<PacketWriter> writer) {
266 return std::unique_ptr<PacketWriter>(new ZipPacketWriter(std::move(writer)));
267 }
268 #endif
269
270 } // namespace perfetto
271