1 // Copyright (c) 2019 The Chromium Embedded Framework Authors. All rights
2 // reserved. Use of this source code is governed by a BSD-style license that
3 // can be found in the LICENSE file.
4
5 #include "libcef/browser/net_service/response_filter_wrapper.h"
6
7 #include <queue>
8
9 #include "base/logging.h"
10 #include "mojo/public/cpp/system/simple_watcher.h"
11 #include "mojo/public/cpp/system/string_data_source.h"
12
13 namespace net_service {
14
15 namespace {
16
17 // Match the default |capacity_num_bytes| value from mojo::Core::CreateDataPipe.
18 static const size_t kBufferSize = 64 * 1024; // 64 Kbytes.
19 static const size_t kMinBufferSpace = 1024; // 1 Kbytes.
20
21 class ResponseFilterWrapper {
22 public:
23 ResponseFilterWrapper(CefRefPtr<CefResponseFilter> filter,
24 mojo::ScopedDataPipeConsumerHandle source_handle,
25 base::OnceClosure error_callback);
26
27 ResponseFilterWrapper(const ResponseFilterWrapper&) = delete;
28 ResponseFilterWrapper& operator=(const ResponseFilterWrapper&) = delete;
29
30 // Creates and returns the output handle, or |source_handle| on failure.
31 bool CreateOutputHandle(mojo::ScopedDataPipeConsumerHandle* output_handle);
32
33 private:
34 void OnSourceReadable(MojoResult, const mojo::HandleSignalsState&);
35 void Filter(const char* data, size_t size);
36 void Write(std::unique_ptr<std::string> data);
37 void OnWriteComplete(std::unique_ptr<std::string>, MojoResult result);
38 void Drain(bool complete);
39 void MaybeSuccess();
40 void Cleanup(bool success);
41
42 CefRefPtr<CefResponseFilter> filter_;
43 mojo::ScopedDataPipeConsumerHandle source_handle_;
44 base::OnceClosure error_callback_;
45
46 std::unique_ptr<mojo::DataPipeProducer> forwarder_;
47 mojo::SimpleWatcher source_watcher_;
48
49 bool read_pending_ = false;
50 bool write_pending_ = false;
51 std::queue<std::unique_ptr<std::string>> pending_data_;
52 cef_response_filter_status_t last_status_ = RESPONSE_FILTER_NEED_MORE_DATA;
53 };
54
ResponseFilterWrapper(CefRefPtr<CefResponseFilter> filter,mojo::ScopedDataPipeConsumerHandle source_handle,base::OnceClosure error_callback)55 ResponseFilterWrapper::ResponseFilterWrapper(
56 CefRefPtr<CefResponseFilter> filter,
57 mojo::ScopedDataPipeConsumerHandle source_handle,
58 base::OnceClosure error_callback)
59 : filter_(filter),
60 source_handle_(std::move(source_handle)),
61 error_callback_(std::move(error_callback)),
62 source_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL) {}
63
CreateOutputHandle(mojo::ScopedDataPipeConsumerHandle * output_handle)64 bool ResponseFilterWrapper::CreateOutputHandle(
65 mojo::ScopedDataPipeConsumerHandle* output_handle) {
66 if (!filter_->InitFilter()) {
67 *output_handle = std::move(source_handle_);
68 return false;
69 }
70
71 mojo::ScopedDataPipeProducerHandle forwarding_handle;
72 if (CreateDataPipe(nullptr, forwarding_handle, *output_handle) !=
73 MOJO_RESULT_OK) {
74 *output_handle = std::move(source_handle_);
75 return false;
76 }
77
78 forwarder_ =
79 std::make_unique<mojo::DataPipeProducer>(std::move(forwarding_handle));
80
81 source_watcher_.Watch(
82 source_handle_.get(),
83 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
84 MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
85 base::BindRepeating(&ResponseFilterWrapper::OnSourceReadable,
86 base::Unretained(this)));
87 source_watcher_.ArmOrNotify();
88 read_pending_ = true;
89
90 return true;
91 }
92
OnSourceReadable(MojoResult,const mojo::HandleSignalsState &)93 void ResponseFilterWrapper::OnSourceReadable(MojoResult,
94 const mojo::HandleSignalsState&) {
95 const void* buffer = nullptr;
96 uint32_t read_bytes = 0;
97 MojoResult result = source_handle_->BeginReadData(&buffer, &read_bytes,
98 MOJO_READ_DATA_FLAG_NONE);
99 if (result == MOJO_RESULT_SHOULD_WAIT) {
100 source_watcher_.ArmOrNotify();
101 return;
102 }
103
104 if (result != MOJO_RESULT_OK) {
105 // Whole body has been read, or something went wrong.
106 Drain(result == MOJO_RESULT_FAILED_PRECONDITION);
107 return;
108 }
109
110 Filter(static_cast<const char*>(buffer), read_bytes);
111 if (last_status_ == RESPONSE_FILTER_ERROR) {
112 // Something went wrong.
113 Drain(false);
114 return;
115 }
116
117 source_handle_->EndReadData(read_bytes);
118 source_watcher_.ArmOrNotify();
119 }
120
Filter(const char * data,size_t size)121 void ResponseFilterWrapper::Filter(const char* data, size_t size) {
122 size_t data_in_size = size;
123 auto data_in_ptr = data_in_size > 0 ? data : nullptr;
124
125 size_t data_out_offset = 0;
126 std::unique_ptr<std::string> data_out;
127
128 while (true) {
129 size_t data_in_read = 0;
130
131 if (!data_out) {
132 // Start a new buffer. Should have no offset to begin with.
133 DCHECK_EQ(0U, data_out_offset);
134 data_out = std::make_unique<std::string>();
135 data_out->resize(kBufferSize);
136 }
137
138 auto data_out_ptr = data_out->data() + data_out_offset;
139 size_t data_out_size = kBufferSize - data_out_offset;
140 size_t data_out_written = 0;
141
142 last_status_ = filter_->Filter(
143 const_cast<char*>(data_in_ptr), data_in_size, data_in_read,
144 const_cast<char*>(data_out_ptr), data_out_size, data_out_written);
145 if (last_status_ == RESPONSE_FILTER_ERROR)
146 break;
147
148 // Validate the out values.
149 if (data_in_read > data_in_size) {
150 LOG(ERROR) << "potential buffer overflow; data_in_read > data_in_size";
151 last_status_ = RESPONSE_FILTER_ERROR;
152 break;
153 }
154 if (data_out_written > data_out_size) {
155 LOG(ERROR)
156 << "potential buffer overflow; data_out_written > data_out_size";
157 last_status_ = RESPONSE_FILTER_ERROR;
158 break;
159 }
160 if (data_out_written == 0 && data_in_read != data_in_size) {
161 LOG(ERROR) << "when no data is written all input must be consumed; "
162 "data_out_written == 0 && data_in_read != data_in_size";
163 last_status_ = RESPONSE_FILTER_ERROR;
164 break;
165 }
166
167 if (data_out_written > 0) {
168 data_out_offset += data_out_written;
169 if (data_out_offset > kBufferSize - kMinBufferSpace) {
170 // The buffer is full or almost full. Write the data that we've
171 // received so far and start a new buffer.
172 data_out->resize(data_out_offset);
173 Write(std::move(data_out));
174 data_out_offset = 0;
175 }
176 }
177
178 if (data_in_read < data_in_size) {
179 // Keep going until the user reads all data.
180 data_in_ptr += data_in_read;
181 data_in_size -= data_in_read;
182 continue;
183 }
184
185 // At this point the user has read all data...
186 if (data_in_ptr) {
187 // Clear the input buffer.
188 data_in_read = data_in_size = 0;
189 data_in_ptr = nullptr;
190 }
191
192 if (data_out_written == data_out_size &&
193 last_status_ == RESPONSE_FILTER_NEED_MORE_DATA) {
194 // Output buffer was filled, but data is still pending.
195 continue;
196 }
197
198 if (data_out_offset > 0) {
199 // Write the last of the data that we've received.
200 data_out->resize(data_out_offset);
201 Write(std::move(data_out));
202 }
203
204 break;
205 }
206 }
207
Write(std::unique_ptr<std::string> data)208 void ResponseFilterWrapper::Write(std::unique_ptr<std::string> data) {
209 if (write_pending_) {
210 // Only one write at a time is supported.
211 pending_data_.push(std::move(data));
212 return;
213 }
214
215 write_pending_ = true;
216
217 base::StringPiece string_piece(*data);
218 forwarder_->Write(std::make_unique<mojo::StringDataSource>(
219 string_piece, mojo::StringDataSource::AsyncWritingMode::
220 STRING_STAYS_VALID_UNTIL_COMPLETION),
221 base::BindOnce(&ResponseFilterWrapper::OnWriteComplete,
222 base::Unretained(this), std::move(data)));
223 }
224
OnWriteComplete(std::unique_ptr<std::string>,MojoResult result)225 void ResponseFilterWrapper::OnWriteComplete(std::unique_ptr<std::string>,
226 MojoResult result) {
227 write_pending_ = false;
228
229 if (result != MOJO_RESULT_OK) {
230 // Something went wrong.
231 Cleanup(false);
232 return;
233 }
234
235 MaybeSuccess();
236 }
237
Drain(bool complete)238 void ResponseFilterWrapper::Drain(bool complete) {
239 read_pending_ = false;
240 source_handle_.reset();
241 source_watcher_.Cancel();
242
243 if (!complete) {
244 // Something went wrong.
245 Cleanup(false);
246 return;
247 }
248
249 if (last_status_ == RESPONSE_FILTER_NEED_MORE_DATA) {
250 // Let the user write any remaining data.
251 Filter(nullptr, 0);
252 if (last_status_ != RESPONSE_FILTER_DONE) {
253 // Something went wrong.
254 Cleanup(false);
255 return;
256 }
257 }
258
259 MaybeSuccess();
260 }
261
MaybeSuccess()262 void ResponseFilterWrapper::MaybeSuccess() {
263 if (!write_pending_ && !pending_data_.empty()) {
264 // Write the next data segment.
265 auto next = std::move(pending_data_.front());
266 pending_data_.pop();
267 Write(std::move(next));
268 return;
269 }
270
271 if (!read_pending_ && !write_pending_)
272 Cleanup(true);
273 }
274
Cleanup(bool success)275 void ResponseFilterWrapper::Cleanup(bool success) {
276 if (!success && error_callback_)
277 std::move(error_callback_).Run();
278 delete this;
279 }
280
281 } // namespace
282
CreateResponseFilterHandler(CefRefPtr<CefResponseFilter> filter,mojo::ScopedDataPipeConsumerHandle source_handle,base::OnceClosure error_callback)283 mojo::ScopedDataPipeConsumerHandle CreateResponseFilterHandler(
284 CefRefPtr<CefResponseFilter> filter,
285 mojo::ScopedDataPipeConsumerHandle source_handle,
286 base::OnceClosure error_callback) {
287 // |filter_wrapper| will delete itself when filtering is complete if
288 // CreateOutputHandle returns true. Otherwise, it will return the
289 // original |source_handle|.
290 auto filter_wrapper = new ResponseFilterWrapper(
291 filter, std::move(source_handle), std::move(error_callback));
292 mojo::ScopedDataPipeConsumerHandle output_handle;
293 if (!filter_wrapper->CreateOutputHandle(&output_handle))
294 delete filter_wrapper;
295 return output_handle;
296 }
297
298 } // namespace net_service
299