• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/ports/message_queue.h"
6 
7 #include <algorithm>
8 
9 #include "base/logging.h"
10 #include "mojo/edk/system/ports/event.h"
11 #include "mojo/edk/system/ports/message_filter.h"
12 
13 namespace mojo {
14 namespace edk {
15 namespace ports {
16 
GetSequenceNum(const ScopedMessage & message)17 inline uint64_t GetSequenceNum(const ScopedMessage& message) {
18   return GetEventData<UserEventData>(*message)->sequence_num;
19 }
20 
21 // Used by std::{push,pop}_heap functions
operator <(const ScopedMessage & a,const ScopedMessage & b)22 inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) {
23   return GetSequenceNum(a) > GetSequenceNum(b);
24 }
25 
MessageQueue()26 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
27 
MessageQueue(uint64_t next_sequence_num)28 MessageQueue::MessageQueue(uint64_t next_sequence_num)
29     : next_sequence_num_(next_sequence_num) {
30   // The message queue is blocked waiting for a message with sequence number
31   // equal to |next_sequence_num|.
32 }
33 
~MessageQueue()34 MessageQueue::~MessageQueue() {
35 #if DCHECK_IS_ON()
36   size_t num_leaked_ports = 0;
37   for (const auto& message : heap_)
38     num_leaked_ports += message->num_ports();
39   DVLOG_IF(1, num_leaked_ports > 0)
40       << "Leaking " << num_leaked_ports << " ports in unreceived messages";
41 #endif
42 }
43 
HasNextMessage() const44 bool MessageQueue::HasNextMessage() const {
45   return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_;
46 }
47 
GetNextMessage(ScopedMessage * message,MessageFilter * filter)48 void MessageQueue::GetNextMessage(ScopedMessage* message,
49                                   MessageFilter* filter) {
50   if (!HasNextMessage() || (filter && !filter->Match(*heap_[0].get()))) {
51     message->reset();
52     return;
53   }
54 
55   std::pop_heap(heap_.begin(), heap_.end());
56   *message = std::move(heap_.back());
57   heap_.pop_back();
58 
59   next_sequence_num_++;
60 }
61 
AcceptMessage(ScopedMessage message,bool * has_next_message)62 void MessageQueue::AcceptMessage(ScopedMessage message,
63                                  bool* has_next_message) {
64   DCHECK(GetEventHeader(*message)->type == EventType::kUser);
65 
66   // TODO: Handle sequence number roll-over.
67 
68   heap_.emplace_back(std::move(message));
69   std::push_heap(heap_.begin(), heap_.end());
70 
71   if (!signalable_) {
72     *has_next_message = false;
73   } else {
74     *has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_);
75   }
76 }
77 
GetReferencedPorts(std::deque<PortName> * port_names)78 void MessageQueue::GetReferencedPorts(std::deque<PortName>* port_names) {
79   for (const auto& message : heap_) {
80     for (size_t i = 0; i < message->num_ports(); ++i)
81       port_names->push_back(message->ports()[i]);
82   }
83 }
84 
85 }  // namespace ports
86 }  // namespace edk
87 }  // namespace mojo
88