• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/system/string_data_pipe_producer.h"
6 
7 #include <algorithm>
8 
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/location.h"
12 #include "base/task_scheduler/post_task.h"
13 
14 namespace mojo {
15 
16 namespace {
17 
18 // Attempts to write data to a producer handle. Outputs the actual number of
19 // bytes written in |*size|, and returns a result indicating the status of the
20 // last attempted write operation.
WriteDataToProducerHandle(DataPipeProducerHandle producer,const char * data,size_t * size)21 MojoResult WriteDataToProducerHandle(DataPipeProducerHandle producer,
22                                      const char* data,
23                                      size_t* size) {
24   void* dest;
25   uint32_t bytes_left = static_cast<uint32_t>(*size);
26 
27   // We loop here since the pipe's available capacity may be larger than its
28   // *contiguous* capacity, and hence two independent consecutive two-phase
29   // writes may succeed. The goal here is to write as much of |data| as possible
30   // until we either run out of data or run out of capacity.
31   MojoResult result;
32   do {
33     uint32_t capacity = bytes_left;
34     result =
35         producer.BeginWriteData(&dest, &capacity, MOJO_WRITE_DATA_FLAG_NONE);
36     if (result == MOJO_RESULT_SHOULD_WAIT) {
37       result = MOJO_RESULT_OK;
38       break;
39     } else if (result != MOJO_RESULT_OK) {
40       break;
41     }
42 
43     capacity = std::min(capacity, bytes_left);
44     memcpy(dest, data, capacity);
45     MojoResult end_result = producer.EndWriteData(capacity);
46     DCHECK_EQ(MOJO_RESULT_OK, end_result);
47 
48     data += capacity;
49     bytes_left -= capacity;
50   } while (bytes_left);
51 
52   *size -= bytes_left;
53   return result;
54 }
55 
56 }  // namespace
57 
StringDataPipeProducer(ScopedDataPipeProducerHandle producer)58 StringDataPipeProducer::StringDataPipeProducer(
59     ScopedDataPipeProducerHandle producer)
60     : producer_(std::move(producer)),
61       watcher_(FROM_HERE,
62                SimpleWatcher::ArmingPolicy::AUTOMATIC,
63                base::SequencedTaskRunnerHandle::Get()),
64       weak_factory_(this) {}
65 
66 StringDataPipeProducer::~StringDataPipeProducer() = default;
67 
Write(const base::StringPiece & data,AsyncWritingMode mode,CompletionCallback callback)68 void StringDataPipeProducer::Write(const base::StringPiece& data,
69                                    AsyncWritingMode mode,
70                                    CompletionCallback callback) {
71   DCHECK(!callback_);
72   callback_ = std::move(callback);
73 
74   // Immediately attempt to write data without making an extra copy. If we can
75   // get it all in one shot, we're done aleady.
76   size_t size = data.size();
77   MojoResult result =
78       WriteDataToProducerHandle(producer_.get(), data.data(), &size);
79   if (result == MOJO_RESULT_OK && size == data.size()) {
80     base::SequencedTaskRunnerHandle::Get()->PostTask(
81         FROM_HERE, base::BindOnce(&StringDataPipeProducer::InvokeCallback,
82                                   weak_factory_.GetWeakPtr(), MOJO_RESULT_OK));
83   } else {
84     // Copy whatever data didn't make the cut and try again when the pipe has
85     // some more capacity.
86     if (mode == AsyncWritingMode::STRING_MAY_BE_INVALIDATED_BEFORE_COMPLETION) {
87       data_ = std::string(data.data() + size, data.size() - size);
88       data_view_ = data_;
89     } else {
90       data_view_ = base::StringPiece(data.data() + size, data.size() - size);
91     }
92     watcher_.Watch(producer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
93                    MOJO_WATCH_CONDITION_SATISFIED,
94                    base::Bind(&StringDataPipeProducer::OnProducerHandleReady,
95                               base::Unretained(this)));
96   }
97 }
98 
InvokeCallback(MojoResult result)99 void StringDataPipeProducer::InvokeCallback(MojoResult result) {
100   // May delete |this|.
101   std::move(callback_).Run(result);
102 }
103 
OnProducerHandleReady(MojoResult ready_result,const HandleSignalsState & state)104 void StringDataPipeProducer::OnProducerHandleReady(
105     MojoResult ready_result,
106     const HandleSignalsState& state) {
107   bool failed = false;
108   size_t size = data_view_.size();
109   if (ready_result == MOJO_RESULT_OK) {
110     MojoResult write_result =
111         WriteDataToProducerHandle(producer_.get(), data_view_.data(), &size);
112     if (write_result != MOJO_RESULT_OK)
113       failed = true;
114   } else {
115     failed = true;
116   }
117 
118   if (failed) {
119     watcher_.Cancel();
120 
121     // May delete |this|.
122     std::move(callback_).Run(MOJO_RESULT_ABORTED);
123     return;
124   }
125 
126   if (size == data_view_.size()) {
127     watcher_.Cancel();
128 
129     // May delete |this|.
130     std::move(callback_).Run(MOJO_RESULT_OK);
131     return;
132   }
133 
134   data_view_ =
135       base::StringPiece(data_view_.data() + size, data_view_.size() - size);
136 }
137 
138 }  // namespace mojo
139