• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/perfetto_cmd/packet_writer.h"
18 
19 #include <array>
20 
21 #include <fcntl.h>
22 #include <getopt.h>
23 #include <signal.h>
24 #include <stdio.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <zlib.h>
28 
29 #include "perfetto/base/paged_memory.h"
30 #include "perfetto/protozero/proto_utils.h"
31 #include "perfetto/tracing/core/trace_packet.h"
32 
33 namespace perfetto {
34 namespace {
35 
36 using protozero::proto_utils::kMessageLengthFieldSize;
37 using protozero::proto_utils::MakeTagLengthDelimited;
38 using protozero::proto_utils::WriteRedundantVarInt;
39 using protozero::proto_utils::WriteVarInt;
40 using Preamble = std::array<char, 16>;
41 
42 // ID of the |packet| field in trace.proto. Hardcoded as this we don't
43 // want to depend on protos/trace:lite for binary size saving reasons.
44 constexpr uint32_t kPacketId = 1;
45 // ID of |compressed_packets| in trace_packet.proto.
46 constexpr uint32_t kCompressedPacketsId = 50;
47 
48 // Maximum allowable size for a single packet.
49 const size_t kMaxPacketSize = 500 * 1024;
50 // After every kPendingBytesLimit we do a Z_SYNC_FLUSH in the zlib stream.
51 const size_t kPendingBytesLimit = 32 * 1024;
52 
53 template <uint32_t id>
GetPreamble(size_t sz,Preamble * preamble)54 size_t GetPreamble(size_t sz, Preamble* preamble) {
55   uint8_t* ptr = reinterpret_cast<uint8_t*>(preamble->data());
56   constexpr uint32_t tag = MakeTagLengthDelimited(id);
57   ptr = WriteVarInt(tag, ptr);
58   ptr = WriteVarInt(sz, ptr);
59   size_t preamble_size = reinterpret_cast<uintptr_t>(ptr) -
60                          reinterpret_cast<uintptr_t>(preamble->data());
61   PERFETTO_DCHECK(preamble_size < preamble->size());
62   return preamble_size;
63 }
64 
65 class FilePacketWriter : public PacketWriter {
66  public:
67   FilePacketWriter(FILE* fd);
68   ~FilePacketWriter() override;
69   bool WritePackets(const std::vector<TracePacket>& packets) override;
70 
71  private:
72   FILE* fd_;
73 };
74 
75 class ZipPacketWriter : public PacketWriter {
76  public:
77   ZipPacketWriter(std::unique_ptr<PacketWriter>);
78   ~ZipPacketWriter() override;
79   bool WritePackets(const std::vector<TracePacket>& packets) override;
80 
81  private:
82   bool WritePacket(const TracePacket& packet);
83   void CheckEq(int actual_code, int expected_code);
84   bool FinalizeCompressedPacket();
Deflate(const char * ptr,size_t size)85   inline void Deflate(const char* ptr, size_t size) {
86     return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
87   }
Deflate(const void * ptr,size_t size)88   inline void Deflate(const void* ptr, size_t size) {
89     return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
90   }
91   void Deflate(const uint8_t* ptr, size_t size);
92 
93   std::unique_ptr<PacketWriter> writer_;
94   z_stream stream_{};
95 
96   base::PagedMemory buf_;
97   uint8_t* const start_;
98   uint8_t* const end_;
99 
100   bool is_compressing_ = false;
101   size_t pending_bytes_ = 0;
102 };
103 
FilePacketWriter(FILE * fd)104 FilePacketWriter::FilePacketWriter(FILE* fd) : fd_(fd) {}
105 
~FilePacketWriter()106 FilePacketWriter::~FilePacketWriter() {
107   fflush(fd_);
108 }
109 
WritePackets(const std::vector<TracePacket> & packets)110 bool FilePacketWriter::WritePackets(const std::vector<TracePacket>& packets) {
111   for (const TracePacket& packet : packets) {
112     Preamble preamble;
113     size_t size = GetPreamble<kPacketId>(packet.size(), &preamble);
114     if (fwrite(preamble.data(), 1, size, fd_) != size)
115       return false;
116     for (const Slice& slice : packet.slices()) {
117       if (fwrite(reinterpret_cast<const char*>(slice.start), 1, slice.size,
118                  fd_) != slice.size) {
119         return false;
120       }
121     }
122   }
123 
124   return true;
125 }
126 
ZipPacketWriter(std::unique_ptr<PacketWriter> writer)127 ZipPacketWriter::ZipPacketWriter(std::unique_ptr<PacketWriter> writer)
128     : writer_(std::move(writer)),
129       buf_(base::PagedMemory::Allocate(kMaxPacketSize)),
130       start_(static_cast<uint8_t*>(buf_.Get())),
131       end_(start_ + buf_.size()) {}
132 
~ZipPacketWriter()133 ZipPacketWriter::~ZipPacketWriter() {
134   if (is_compressing_)
135     FinalizeCompressedPacket();
136 }
137 
WritePackets(const std::vector<TracePacket> & packets)138 bool ZipPacketWriter::WritePackets(const std::vector<TracePacket>& packets) {
139   for (const TracePacket& packet : packets) {
140     if (!WritePacket(packet))
141       return false;
142   }
143   return true;
144 }
145 
WritePacket(const TracePacket & packet)146 bool ZipPacketWriter::WritePacket(const TracePacket& packet) {
147   // If we have already written one compressed packet, check whether we should
148   // flush the buffer.
149   if (is_compressing_) {
150     // We have two goals:
151     // - Fit as much data as possible into each packet
152     // - Ensure each packet is under 512KB
153     // We keep track of two numbers:
154     // - the number of remaining bytes in the output buffer
155     // - the number of (pending) uncompressed bytes written since the last flush
156     // The pending bytes may or may not have appeared in output buffer.
157     // Assuming in the worst case each uncompressed input byte can turn into
158     // two compressed bytes we can ensure we don't go over 512KB by not letting
159     // the number of pending bytes go over remaining bytes/2 - however often
160     // each input byte will not turn into 2 output bytes but less than 1 output
161     // byte - so this underfills the packet. To avoid this every 32kb we deflate
162     // with Z_SYNC_FLUSH ensuring all pending bytes are present in the output
163     // buffer.
164     if (pending_bytes_ > kPendingBytesLimit) {
165       CheckEq(deflate(&stream_, Z_SYNC_FLUSH), Z_OK);
166       pending_bytes_ = 0;
167     }
168 
169     PERFETTO_DCHECK(end_ >= stream_.next_out);
170     size_t remaining = static_cast<size_t>(end_ - stream_.next_out);
171     if ((pending_bytes_ + packet.size() + 1024) * 2 > remaining) {
172       if (!FinalizeCompressedPacket()) {
173         return false;
174       }
175     }
176   }
177 
178   // Reinitialize the compresser if needed:
179   if (!is_compressing_) {
180     memset(&stream_, 0, sizeof(stream_));
181     CheckEq(deflateInit(&stream_, 9), Z_OK);
182     is_compressing_ = true;
183     stream_.next_out = start_;
184     stream_.avail_out = static_cast<unsigned int>(end_ - start_);
185   }
186 
187   // Compress the trace packet header:
188   Preamble packet_hdr;
189   size_t packet_hdr_size = GetPreamble<kPacketId>(packet.size(), &packet_hdr);
190   Deflate(packet_hdr.data(), packet_hdr_size);
191 
192   // Compress the trace packet itself:
193   for (const Slice& slice : packet.slices()) {
194     Deflate(slice.start, slice.size);
195   }
196 
197   return true;
198 }
199 
FinalizeCompressedPacket()200 bool ZipPacketWriter::FinalizeCompressedPacket() {
201   PERFETTO_DCHECK(is_compressing_);
202 
203   CheckEq(deflate(&stream_, Z_FINISH), Z_STREAM_END);
204 
205   size_t size = static_cast<size_t>(stream_.next_out - start_);
206   Preamble preamble;
207   size_t preamble_size = GetPreamble<kCompressedPacketsId>(size, &preamble);
208 
209   std::vector<TracePacket> out_packets(1);
210   TracePacket& out_packet = out_packets[0];
211   out_packet.AddSlice(preamble.data(), preamble_size);
212   out_packet.AddSlice(start_, size);
213 
214   if (!writer_->WritePackets(out_packets))
215     return false;
216 
217   is_compressing_ = false;
218   pending_bytes_ = 0;
219   CheckEq(deflateEnd(&stream_), Z_OK);
220   return true;
221 }
222 
CheckEq(int actual_code,int expected_code)223 void ZipPacketWriter::CheckEq(int actual_code, int expected_code) {
224   if (actual_code == expected_code)
225     return;
226   PERFETTO_FATAL("Expected %d got %d: %s", actual_code, expected_code,
227                  stream_.msg);
228 }
229 
Deflate(const uint8_t * ptr,size_t size)230 void ZipPacketWriter::Deflate(const uint8_t* ptr, size_t size) {
231   PERFETTO_CHECK(is_compressing_);
232   stream_.next_in = ptr;
233   stream_.avail_in = static_cast<unsigned int>(size);
234   CheckEq(deflate(&stream_, Z_NO_FLUSH), Z_OK);
235   PERFETTO_CHECK(stream_.avail_in == 0);
236   pending_bytes_ += size;
237 }
238 
239 }  // namespace
240 
PacketWriter()241 PacketWriter::PacketWriter() {}
242 
~PacketWriter()243 PacketWriter::~PacketWriter() {}
244 
CreateFilePacketWriter(FILE * fd)245 std::unique_ptr<PacketWriter> CreateFilePacketWriter(FILE* fd) {
246   return std::unique_ptr<PacketWriter>(new FilePacketWriter(fd));
247 }
248 
CreateZipPacketWriter(std::unique_ptr<PacketWriter> writer)249 std::unique_ptr<PacketWriter> CreateZipPacketWriter(
250     std::unique_ptr<PacketWriter> writer) {
251   return std::unique_ptr<PacketWriter>(new ZipPacketWriter(std::move(writer)));
252 }
253 
254 }  // namespace perfetto
255