1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/system/string_data_pipe_producer.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/location.h"
12 #include "base/task_scheduler/post_task.h"
13
14 namespace mojo {
15
16 namespace {
17
18 // Attempts to write data to a producer handle. Outputs the actual number of
19 // bytes written in |*size|, and returns a result indicating the status of the
20 // last attempted write operation.
WriteDataToProducerHandle(DataPipeProducerHandle producer,const char * data,size_t * size)21 MojoResult WriteDataToProducerHandle(DataPipeProducerHandle producer,
22 const char* data,
23 size_t* size) {
24 void* dest;
25 uint32_t bytes_left = static_cast<uint32_t>(*size);
26
27 // We loop here since the pipe's available capacity may be larger than its
28 // *contiguous* capacity, and hence two independent consecutive two-phase
29 // writes may succeed. The goal here is to write as much of |data| as possible
30 // until we either run out of data or run out of capacity.
31 MojoResult result;
32 do {
33 uint32_t capacity = bytes_left;
34 result =
35 producer.BeginWriteData(&dest, &capacity, MOJO_WRITE_DATA_FLAG_NONE);
36 if (result == MOJO_RESULT_SHOULD_WAIT) {
37 result = MOJO_RESULT_OK;
38 break;
39 } else if (result != MOJO_RESULT_OK) {
40 break;
41 }
42
43 capacity = std::min(capacity, bytes_left);
44 memcpy(dest, data, capacity);
45 MojoResult end_result = producer.EndWriteData(capacity);
46 DCHECK_EQ(MOJO_RESULT_OK, end_result);
47
48 data += capacity;
49 bytes_left -= capacity;
50 } while (bytes_left);
51
52 *size -= bytes_left;
53 return result;
54 }
55
56 } // namespace
57
StringDataPipeProducer(ScopedDataPipeProducerHandle producer)58 StringDataPipeProducer::StringDataPipeProducer(
59 ScopedDataPipeProducerHandle producer)
60 : producer_(std::move(producer)),
61 watcher_(FROM_HERE,
62 SimpleWatcher::ArmingPolicy::AUTOMATIC,
63 base::SequencedTaskRunnerHandle::Get()),
64 weak_factory_(this) {}
65
66 StringDataPipeProducer::~StringDataPipeProducer() = default;
67
Write(const base::StringPiece & data,AsyncWritingMode mode,CompletionCallback callback)68 void StringDataPipeProducer::Write(const base::StringPiece& data,
69 AsyncWritingMode mode,
70 CompletionCallback callback) {
71 DCHECK(!callback_);
72 callback_ = std::move(callback);
73
74 // Immediately attempt to write data without making an extra copy. If we can
75 // get it all in one shot, we're done aleady.
76 size_t size = data.size();
77 MojoResult result =
78 WriteDataToProducerHandle(producer_.get(), data.data(), &size);
79 if (result == MOJO_RESULT_OK && size == data.size()) {
80 base::SequencedTaskRunnerHandle::Get()->PostTask(
81 FROM_HERE, base::BindOnce(&StringDataPipeProducer::InvokeCallback,
82 weak_factory_.GetWeakPtr(), MOJO_RESULT_OK));
83 } else {
84 // Copy whatever data didn't make the cut and try again when the pipe has
85 // some more capacity.
86 if (mode == AsyncWritingMode::STRING_MAY_BE_INVALIDATED_BEFORE_COMPLETION) {
87 data_ = std::string(data.data() + size, data.size() - size);
88 data_view_ = data_;
89 } else {
90 data_view_ = base::StringPiece(data.data() + size, data.size() - size);
91 }
92 watcher_.Watch(producer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
93 MOJO_WATCH_CONDITION_SATISFIED,
94 base::Bind(&StringDataPipeProducer::OnProducerHandleReady,
95 base::Unretained(this)));
96 }
97 }
98
InvokeCallback(MojoResult result)99 void StringDataPipeProducer::InvokeCallback(MojoResult result) {
100 // May delete |this|.
101 std::move(callback_).Run(result);
102 }
103
OnProducerHandleReady(MojoResult ready_result,const HandleSignalsState & state)104 void StringDataPipeProducer::OnProducerHandleReady(
105 MojoResult ready_result,
106 const HandleSignalsState& state) {
107 bool failed = false;
108 size_t size = data_view_.size();
109 if (ready_result == MOJO_RESULT_OK) {
110 MojoResult write_result =
111 WriteDataToProducerHandle(producer_.get(), data_view_.data(), &size);
112 if (write_result != MOJO_RESULT_OK)
113 failed = true;
114 } else {
115 failed = true;
116 }
117
118 if (failed) {
119 watcher_.Cancel();
120
121 // May delete |this|.
122 std::move(callback_).Run(MOJO_RESULT_ABORTED);
123 return;
124 }
125
126 if (size == data_view_.size()) {
127 watcher_.Cancel();
128
129 // May delete |this|.
130 std::move(callback_).Run(MOJO_RESULT_OK);
131 return;
132 }
133
134 data_view_ =
135 base::StringPiece(data_view_.data() + size, data_view_.size() - size);
136 }
137
138 } // namespace mojo
139