1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_log_multisink/log_queue.h"
16
17 #include "pw_assert/assert.h"
18 #include "pw_log/levels.h"
19 #include "pw_log_proto/log.pwpb.h"
20 #include "pw_protobuf/wire_format.h"
21 #include "pw_status/try.h"
22
23 namespace pw::log_rpc {
24 namespace {
25
26 using pw::protobuf::WireType;
27 constexpr uint32_t kLogKey = pw::protobuf::MakeKey(
28 static_cast<uint32_t>(pw::log::LogEntries::Fields::ENTRIES),
29 WireType::kDelimited);
30
31 } // namespace
32
PushTokenizedMessage(ConstByteSpan message,uint32_t flags,uint32_t level,uint32_t line,uint32_t thread,int64_t timestamp)33 Status LogQueue::PushTokenizedMessage(ConstByteSpan message,
34 uint32_t flags,
35 uint32_t level,
36 uint32_t line,
37 uint32_t thread,
38 int64_t timestamp) {
39 pw::protobuf::NestedEncoder nested_encoder(encode_buffer_);
40 pw::log::LogEntry::Encoder encoder(&nested_encoder);
41 Status status;
42
43 encoder.WriteMessageTokenized(message);
44 encoder.WriteLineLevel(
45 (level & PW_LOG_LEVEL_BITMASK) |
46 ((line << PW_LOG_LEVEL_BITWIDTH) & ~PW_LOG_LEVEL_BITMASK));
47 encoder.WriteFlags(flags);
48 encoder.WriteThreadTokenized(thread);
49
50 // TODO(prashanthsw): Add support for delta encoding of the timestamp.
51 encoder.WriteTimestamp(timestamp);
52
53 if (dropped_entries_ > 0) {
54 encoder.WriteDropped(dropped_entries_);
55 }
56
57 ConstByteSpan log_entry;
58 status = nested_encoder.Encode(&log_entry);
59 if (!status.ok() || log_entry.size_bytes() > max_log_entry_size_) {
60 // If an encoding failure occurs or the constructed log entry is larger
61 // than the configured max size, map the error to INTERNAL. If the
62 // underlying allocation of this encode buffer or the nested encoding
63 // sequencing are at fault, they are not the caller's responsibility. If
64 // the log entry is larger than the max allowed size, the log is dropped
65 // intentionally, and it is expected that the caller accepts this
66 // possibility.
67 status = PW_STATUS_INTERNAL;
68 } else {
69 // Try to push back the encoded log entry.
70 status = ring_buffer_.TryPushBack(log_entry, kLogKey);
71 }
72
73 if (!status.ok()) {
74 // The ring buffer may hit the RESOURCE_EXHAUSTED state, causing us
75 // to drop packets. However, this check captures all failures from
76 // Encode and TryPushBack, as any failure here causes packet drop.
77 dropped_entries_++;
78 latest_dropped_timestamp_ = timestamp;
79 return status;
80 }
81
82 dropped_entries_ = 0;
83 return OkStatus();
84 }
85
Pop(LogEntriesBuffer entry_buffer)86 Result<LogEntries> LogQueue::Pop(LogEntriesBuffer entry_buffer) {
87 size_t ring_buffer_entry_size = 0;
88 PW_TRY(pop_status_for_test_);
89 // The caller must provide a buffer that is at minimum max_log_entry_size, to
90 // ensure that the front entry of the ring buffer can be popped.
91 PW_DCHECK_UINT_GE(entry_buffer.size_bytes(), max_log_entry_size_);
92 PW_TRY(ring_buffer_.PeekFrontWithPreamble(entry_buffer,
93 &ring_buffer_entry_size));
94 PW_DCHECK_OK(ring_buffer_.PopFront());
95
96 return LogEntries{
97 .entries = ConstByteSpan(entry_buffer.first(ring_buffer_entry_size)),
98 .entry_count = 1};
99 }
100
PopMultiple(LogEntriesBuffer entries_buffer)101 LogEntries LogQueue::PopMultiple(LogEntriesBuffer entries_buffer) {
102 size_t offset = 0;
103 size_t entry_count = 0;
104
105 // The caller must provide a buffer that is at minimum max_log_entry_size, to
106 // ensure that the front entry of the ring buffer can be popped.
107 PW_DCHECK_UINT_GE(entries_buffer.size_bytes(), max_log_entry_size_);
108
109 while (ring_buffer_.EntryCount() > 0 &&
110 (entries_buffer.size_bytes() - offset) > max_log_entry_size_) {
111 const Result<LogEntries> result = Pop(entries_buffer.subspan(offset));
112 if (!result.ok()) {
113 break;
114 }
115 offset += result.value().entries.size_bytes();
116 entry_count += result.value().entry_count;
117 }
118
119 return LogEntries{.entries = ConstByteSpan(entries_buffer.first(offset)),
120 .entry_count = entry_count};
121 }
122
123 } // namespace pw::log_rpc
124