• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef ANDROID_DVR_BUFFER_HUB_QUEUE_CLIENT_H_
2 #define ANDROID_DVR_BUFFER_HUB_QUEUE_CLIENT_H_
3 
4 #include <gui/BufferQueueDefs.h>
5 
6 #include <pdx/client.h>
7 #include <pdx/status.h>
8 #include <private/dvr/buffer_hub_client.h>
9 #include <private/dvr/epoll_file_descriptor.h>
10 #include <private/dvr/ring_buffer.h>
11 
12 #include <memory>
13 #include <vector>
14 
15 namespace android {
16 namespace dvr {
17 
18 class ConsumerQueue;
19 
20 // |BufferHubQueue| manages a queue of |BufferHubBuffer|s. Buffers are
21 // automatically re-requeued when released by the remote side.
22 class BufferHubQueue : public pdx::Client {
23  public:
24   using LocalHandle = pdx::LocalHandle;
25   using LocalChannelHandle = pdx::LocalChannelHandle;
26   template <typename T>
27   using Status = pdx::Status<T>;
28 
~BufferHubQueue()29   virtual ~BufferHubQueue() {}
30   void Initialize();
31 
32   // Create a new consumer queue that is attached to the producer. Returns
33   // a new consumer queue client or nullptr on failure.
34   std::unique_ptr<ConsumerQueue> CreateConsumerQueue();
35 
36   // Create a new consumer queue that is attached to the producer. This queue
37   // sets each of its imported consumer buffers to the ignored state to avoid
38   // participation in lifecycle events.
39   std::unique_ptr<ConsumerQueue> CreateSilentConsumerQueue();
40 
41   // Return the default buffer width of this buffer queue.
default_width()42   size_t default_width() const { return default_width_; }
43 
44   // Return the default buffer height of this buffer queue.
default_height()45   size_t default_height() const { return default_height_; }
46 
47   // Return the default buffer format of this buffer queue.
default_format()48   int32_t default_format() const { return default_format_; }
49 
50   // Create a new consumer in handle form for immediate transport over RPC.
51   Status<LocalChannelHandle> CreateConsumerQueueHandle();
52 
53   // Return the number of buffers avaiable for dequeue.
count()54   size_t count() const { return available_buffers_.GetSize(); }
55 
56   // Return the total number of buffers that the queue is tracking.
capacity()57   size_t capacity() const { return capacity_; }
58 
59   // Return the size of metadata structure associated with this BufferBubQueue.
metadata_size()60   size_t metadata_size() const { return meta_size_; }
61 
62   // Return whether the buffer queue is alrady full.
is_full()63   bool is_full() const { return available_buffers_.IsFull(); }
64 
65   explicit operator bool() const { return epoll_fd_.IsValid(); }
66 
GetBuffer(size_t slot)67   std::shared_ptr<BufferHubBuffer> GetBuffer(size_t slot) const {
68     return buffers_[slot];
69   }
70 
GetEventMask(int events)71   Status<int> GetEventMask(int events) {
72     if (auto* client_channel = GetChannel()) {
73       return client_channel->GetEventMask(events);
74     } else {
75       return pdx::ErrorStatus(EINVAL);
76     }
77   }
78 
79   // Returns an fd that signals pending queue events using
80   // EPOLLIN/POLLIN/readible. Either HandleQueueEvents or WaitForBuffers may be
81   // called to handle pending queue events.
queue_fd()82   int queue_fd() const { return epoll_fd_.Get(); }
83 
84   // Handles any pending events, returning available buffers to the queue and
85   // reaping disconnected buffers. Returns true if successful, false if an error
86   // occurred.
HandleQueueEvents()87   bool HandleQueueEvents() { return WaitForBuffers(0); }
88 
89   // Enqueue a buffer marks buffer to be available (|Gain|'ed for producer
90   // and |Acquire|'ed for consumer. This is only used for internal bookkeeping.
91   void Enqueue(const std::shared_ptr<BufferHubBuffer>& buf, size_t slot);
92 
93   // |BufferHubQueue| will keep track of at most this value of buffers.
94   static constexpr size_t kMaxQueueCapacity =
95       android::BufferQueueDefs::NUM_BUFFER_SLOTS;
96 
97   // Special epoll data field indicating that the epoll event refers to the
98   // queue.
99   static constexpr int64_t kEpollQueueEventIndex = -1;
100 
101   // When pass |kNoTimeout| to |Dequeue|, it will block indefinitely without a
102   // timeout.
103   static constexpr int kNoTimeOut = -1;
104 
id()105   int id() const { return id_; }
hung_up()106   bool hung_up() const { return hung_up_; }
107 
108  protected:
109   BufferHubQueue(LocalChannelHandle channel);
110   BufferHubQueue(const std::string& endpoint_path);
111 
112   // Imports the queue parameters by querying BufferHub for the parameters for
113   // this channel.
114   Status<void> ImportQueue();
115 
116   // Sets up the queue with the given parameters.
117   void SetupQueue(size_t meta_size_bytes_, int id);
118 
119   // Called by ProducerQueue::AddBuffer and ConsumerQueue::AddBuffer only. to
120   // register a buffer for epoll and internal bookkeeping.
121   int AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf, size_t slot);
122 
123   // Called by ProducerQueue::DetachBuffer and ConsumerQueue::DetachBuffer only.
124   // to deregister a buffer for epoll and internal bookkeeping.
125   virtual int DetachBuffer(size_t slot);
126 
127   // Dequeue a buffer from the free queue, blocking until one is available. The
128   // timeout argument specifies the number of milliseconds that |Dequeue()| will
129   // block. Specifying a timeout of -1 causes |Dequeue()| to block indefinitely,
130   // while specifying a timeout equal to zero cause |Dequeue()| to return
131   // immediately, even if no buffers are available.
132   pdx::Status<std::shared_ptr<BufferHubBuffer>> Dequeue(int timeout,
133                                                         size_t* slot,
134                                                         void* meta,
135                                                         LocalHandle* fence);
136 
137   // Wait for buffers to be released and re-add them to the queue.
138   bool WaitForBuffers(int timeout);
139   void HandleBufferEvent(size_t slot, int poll_events);
140   void HandleQueueEvent(int poll_events);
141 
142   virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
143                             LocalHandle* fence) = 0;
144 
145   // Called when a buffer is allocated remotely.
OnBufferAllocated()146   virtual Status<void> OnBufferAllocated() { return {}; }
147 
148   // Data members to handle arbitrary metadata passed through BufferHub. It is
149   // fair to enforce that all buffers in the same queue share the same metadata
150   // type. |meta_size_| is used to store the size of metadata on queue creation;
151   // and |meta_buffer_tmp_| is allocated and resized to |meta_size_| on queue
152   // creation to be later used as temporary space so that we can avoid
153   // additional dynamic memory allocation in each |Enqueue| and |Dequeue| call.
154   size_t meta_size_;
155 
156   // Here we intentionally choose |unique_ptr<uint8_t[]>| over vector<uint8_t>
157   // to disallow dynamic resizing for stability reasons.
158   std::unique_ptr<uint8_t[]> meta_buffer_tmp_;
159 
160  private:
161   static constexpr size_t kMaxEvents = 128;
162 
163   // The |u64| data field of an epoll event is interpreted as int64_t:
164   // When |index| >= 0 and |index| < kMaxQueueCapacity it refers to a specific
165   // element of |buffers_| as a direct index;
is_buffer_event_index(int64_t index)166   static bool is_buffer_event_index(int64_t index) {
167     return index >= 0 &&
168            index < static_cast<int64_t>(BufferHubQueue::kMaxQueueCapacity);
169   }
170 
171   // When |index| == kEpollQueueEventIndex, it refers to the queue itself.
is_queue_event_index(int64_t index)172   static bool is_queue_event_index(int64_t index) {
173     return index == BufferHubQueue::kEpollQueueEventIndex;
174   }
175 
176   struct BufferInfo {
177     // A logical slot number that is assigned to a buffer at allocation time.
178     // The slot number remains unchanged during the entire life cycle of the
179     // buffer and should not be confused with the enqueue and dequeue order.
180     size_t slot;
181 
182     // A BufferHubBuffer client.
183     std::shared_ptr<BufferHubBuffer> buffer;
184 
185     // Metadata associated with the buffer.
186     std::unique_ptr<uint8_t[]> metadata;
187 
BufferInfoBufferInfo188     BufferInfo() : BufferInfo(-1, 0) {}
189 
BufferInfoBufferInfo190     BufferInfo(size_t slot, size_t metadata_size)
191         : slot(slot),
192           buffer(nullptr),
193           metadata(metadata_size ? new uint8_t[metadata_size] : nullptr) {}
194 
BufferInfoBufferInfo195     BufferInfo(BufferInfo&& other)
196         : slot(other.slot),
197           buffer(std::move(other.buffer)),
198           metadata(std::move(other.metadata)) {}
199 
200     BufferInfo& operator=(BufferInfo&& other) {
201       slot = other.slot;
202       buffer = std::move(other.buffer);
203       metadata = std::move(other.metadata);
204       return *this;
205     }
206 
207    private:
208     BufferInfo(const BufferInfo&) = delete;
209     void operator=(BufferInfo&) = delete;
210   };
211 
212   // Default buffer width that can be set to override the buffer width when a
213   // width and height of 0 are specified in AllocateBuffer.
214   size_t default_width_{1};
215 
216   // Default buffer height that can be set to override the buffer height when a
217   // width and height of 0 are specified in AllocateBuffer.
218   size_t default_height_{1};
219 
220   // Default buffer format that can be set to override the buffer format when it
221   // isn't specified in AllocateBuffer.
222   int32_t default_format_{PIXEL_FORMAT_RGBA_8888};
223 
224   // Buffer queue:
225   // |buffers_| tracks all |BufferHubBuffer|s created by this |BufferHubQueue|.
226   std::vector<std::shared_ptr<BufferHubBuffer>> buffers_;
227 
228   // |epollhup_pending_| tracks whether a slot of |buffers_| get detached before
229   // its corresponding EPOLLHUP event got handled. This could happen as the
230   // following sequence:
231   // 1. Producer queue's client side allocates a new buffer (at slot 1).
232   // 2. Producer queue's client side replaces an existing buffer (at slot 0).
233   //    This is implemented by first detaching the buffer and then allocating a
234   //    new buffer.
235   // 3. During the same epoll_wait, Consumer queue's client side gets EPOLLIN
236   //    event on the queue which indicates a new buffer is available and the
237   //    EPOLLHUP event for slot 0. Consumer handles these two events in order.
238   // 4. Consumer client calls BufferHubRPC::ConsumerQueueImportBuffers and both
239   //    slot 0 and (the new) slot 1 buffer will be imported. During the import
240   //    of the buffer at slot 1, consumer client detaches the old buffer so that
241   //    the new buffer can be registered. At the same time
242   //    |epollhup_pending_[slot]| is marked to indicate that buffer at this slot
243   //    was detached prior to EPOLLHUP event.
244   // 5. Consumer client continues to handle the EPOLLHUP. Since
245   //    |epollhup_pending_[slot]| is marked as true, it can safely ignore the
246   //    event without detaching the newly allocated buffer at slot 1.
247   //
248   // In normal situations where the previously described sequence doesn't
249   // happen, an EPOLLHUP event should trigger a regular buffer detach.
250   std::vector<bool> epollhup_pending_;
251 
252   // |available_buffers_| uses |dvr::RingBuffer| to implementation queue
253   // sematics. When |Dequeue|, we pop the front element from
254   // |available_buffers_|, and  that buffer's reference count will decrease by
255   // one, while another reference in |buffers_| keeps the last reference to
256   // prevent the buffer from being deleted.
257   RingBuffer<BufferInfo> available_buffers_;
258 
259   // Fences (acquire fence for consumer and release fence for consumer) , one
260   // for each buffer slot.
261   std::vector<LocalHandle> fences_;
262 
263   // Keep track with how many buffers have been added into the queue.
264   size_t capacity_;
265 
266   // Epoll fd used to wait for BufferHub events.
267   EpollFileDescriptor epoll_fd_;
268 
269   // Flag indicating that the other side hung up. For ProducerQueues this
270   // triggers when BufferHub dies or explicitly closes the queue channel. For
271   // ConsumerQueues this can either mean the same or that the ProducerQueue on
272   // the other end hung up.
273   bool hung_up_{false};
274 
275   // Global id for the queue that is consistent across processes.
276   int id_;
277 
278   BufferHubQueue(const BufferHubQueue&) = delete;
279   void operator=(BufferHubQueue&) = delete;
280 };
281 
282 class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
283  public:
284   template <typename Meta>
Create()285   static std::unique_ptr<ProducerQueue> Create() {
286     return BASE::Create(sizeof(Meta));
287   }
Create(size_t meta_size_bytes)288   static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes) {
289     return BASE::Create(meta_size_bytes);
290   }
291 
292   // Usage bits in |usage_set_mask| will be automatically masked on. Usage bits
293   // in |usage_clear_mask| will be automatically masked off. Note that
294   // |usage_set_mask| and |usage_clear_mask| may conflict with each other, but
295   // |usage_set_mask| takes precedence over |usage_clear_mask|. All buffer
296   // allocation through this producer queue shall not have any of the usage bits
297   // in |usage_deny_set_mask| set. Allocation calls violating this will be
298   // rejected. All buffer allocation through this producer queue must have all
299   // the usage bits in |usage_deny_clear_mask| set. Allocation calls violating
300   // this will be rejected. Note that |usage_deny_set_mask| and
301   // |usage_deny_clear_mask| shall not conflict with each other. Such
302   // configuration will be treated as invalid input on creation.
303   template <typename Meta>
Create(uint32_t usage_set_mask,uint32_t usage_clear_mask,uint32_t usage_deny_set_mask,uint32_t usage_deny_clear_mask)304   static std::unique_ptr<ProducerQueue> Create(uint32_t usage_set_mask,
305                                                uint32_t usage_clear_mask,
306                                                uint32_t usage_deny_set_mask,
307                                                uint32_t usage_deny_clear_mask) {
308     return BASE::Create(sizeof(Meta), usage_set_mask, usage_clear_mask,
309                         usage_deny_set_mask, usage_deny_clear_mask);
310   }
Create(size_t meta_size_bytes,uint32_t usage_set_mask,uint32_t usage_clear_mask,uint32_t usage_deny_set_mask,uint32_t usage_deny_clear_mask)311   static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes,
312                                                uint32_t usage_set_mask,
313                                                uint32_t usage_clear_mask,
314                                                uint32_t usage_deny_set_mask,
315                                                uint32_t usage_deny_clear_mask) {
316     return BASE::Create(meta_size_bytes, usage_set_mask, usage_clear_mask,
317                         usage_deny_set_mask, usage_deny_clear_mask);
318   }
319 
320   // Import a |ProducerQueue| from a channel handle.
Import(LocalChannelHandle handle)321   static std::unique_ptr<ProducerQueue> Import(LocalChannelHandle handle) {
322     return BASE::Create(std::move(handle));
323   }
324 
325   // Get a buffer producer. Note that the method doesn't check whether the
326   // buffer slot has a valid buffer that has been allocated already. When no
327   // buffer has been imported before it returns |nullptr|; otherwise it returns
328   // a shared pointer to a |BufferProducer|.
GetBuffer(size_t slot)329   std::shared_ptr<BufferProducer> GetBuffer(size_t slot) const {
330     return std::static_pointer_cast<BufferProducer>(
331         BufferHubQueue::GetBuffer(slot));
332   }
333 
334   // Allocate producer buffer to populate the queue. Once allocated, a producer
335   // buffer is automatically enqueue'd into the ProducerQueue and available to
336   // use (i.e. in |Gain|'ed mode).
337   // Returns Zero on success and negative error code when buffer allocation
338   // fails.
339   int AllocateBuffer(uint32_t width, uint32_t height, uint32_t layer_count,
340                      uint32_t format, uint64_t usage, size_t* out_slot);
341 
342   // Add a producer buffer to populate the queue. Once added, a producer buffer
343   // is available to use (i.e. in |Gain|'ed mode).
344   int AddBuffer(const std::shared_ptr<BufferProducer>& buf, size_t slot);
345 
346   // Detach producer buffer from the queue.
347   // Returns Zero on success and negative error code when buffer detach
348   // fails.
349   int DetachBuffer(size_t slot) override;
350 
351   // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
352   // and caller should call Post() once it's done writing to release the buffer
353   // to the consumer side.
354   pdx::Status<std::shared_ptr<BufferProducer>> Dequeue(
355       int timeout, size_t* slot, LocalHandle* release_fence);
356 
357  private:
358   friend BASE;
359 
360   // Constructors are automatically exposed through ProducerQueue::Create(...)
361   // static template methods inherited from ClientBase, which take the same
362   // arguments as the constructors.
363   explicit ProducerQueue(size_t meta_size);
364   ProducerQueue(LocalChannelHandle handle);
365   ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
366                 uint64_t usage_clear_mask, uint64_t usage_deny_set_mask,
367                 uint64_t usage_deny_clear_mask);
368 
369   int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
370                     LocalHandle* release_fence) override;
371 };
372 
373 // Explicit specializations of ProducerQueue::Create for void metadata type.
374 template <>
375 inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>() {
376   return ProducerQueue::Create(0);
377 }
378 template <>
379 inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>(
380     uint32_t usage_set_mask, uint32_t usage_clear_mask,
381     uint32_t usage_deny_set_mask, uint32_t usage_deny_clear_mask) {
382   return ProducerQueue::Create(0, usage_set_mask, usage_clear_mask,
383                                usage_deny_set_mask, usage_deny_clear_mask);
384 }
385 
386 class ConsumerQueue : public BufferHubQueue {
387  public:
388   // Get a buffer consumer. Note that the method doesn't check whether the
389   // buffer slot has a valid buffer that has been imported already. When no
390   // buffer has been imported before it returns nullptr; otherwise returns a
391   // shared pointer to a BufferConsumer.
GetBuffer(size_t slot)392   std::shared_ptr<BufferConsumer> GetBuffer(size_t slot) const {
393     return std::static_pointer_cast<BufferConsumer>(
394         BufferHubQueue::GetBuffer(slot));
395   }
396 
397   // Import a ConsumerQueue from a channel handle. |ignore_on_import| controls
398   // whether or not buffers are set to be ignored when imported. This may be
399   // used to avoid participation in the buffer lifecycle by a consumer queue
400   // that is only used to spawn other consumer queues, such as in an
401   // intermediate service.
402   static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle,
403                                                bool ignore_on_import = false) {
404     return std::unique_ptr<ConsumerQueue>(
405         new ConsumerQueue(std::move(handle), ignore_on_import));
406   }
407 
408   // Import newly created buffers from the service side.
409   // Returns number of buffers successfully imported or an error.
410   Status<size_t> ImportBuffers();
411 
412   // Dequeue a consumer buffer to read. The returned buffer in |Acquired|'ed
413   // mode, and caller should call Releasse() once it's done writing to release
414   // the buffer to the producer side. |meta| is passed along from BufferHub,
415   // The user of BufferProducer is responsible with making sure that the
416   // Dequeue() is done with the corect metadata type and size with those used
417   // when the buffer is orignally created.
418   template <typename Meta>
Dequeue(int timeout,size_t * slot,Meta * meta,LocalHandle * acquire_fence)419   pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
420       int timeout, size_t* slot, Meta* meta, LocalHandle* acquire_fence) {
421     return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
422   }
Dequeue(int timeout,size_t * slot,LocalHandle * acquire_fence)423   pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
424       int timeout, size_t* slot, LocalHandle* acquire_fence) {
425     return Dequeue(timeout, slot, nullptr, 0, acquire_fence);
426   }
427 
428   pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
429       int timeout, size_t* slot, void* meta, size_t meta_size,
430       LocalHandle* acquire_fence);
431 
432  private:
433   friend BufferHubQueue;
434 
435   ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import = false);
436 
437   // Add a consumer buffer to populate the queue. Once added, a consumer buffer
438   // is NOT available to use until the producer side |Post| it. |WaitForBuffers|
439   // will catch the |Post| and |Acquire| the buffer to make it available for
440   // consumer.
441   int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);
442 
443   int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
444                     LocalHandle* acquire_fence) override;
445 
446   Status<void> OnBufferAllocated() override;
447 
448   // Flag indicating that imported (consumer) buffers should be ignored when
449   // imported to avoid participating in the buffer ownership flow.
450   bool ignore_on_import_;
451 };
452 
453 }  // namespace dvr
454 }  // namespace android
455 
456 #endif  // ANDROID_DVR_BUFFER_HUB_QUEUE_CLIENT_H_
457