• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2019 The Chromium Embedded Framework Authors. All rights
2 // reserved. Use of this source code is governed by a BSD-style license that
3 // can be found in the LICENSE file.
4 
5 #include "libcef/browser/net_service/response_filter_wrapper.h"
6 
7 #include <queue>
8 
9 #include "base/logging.h"
10 #include "mojo/public/cpp/system/simple_watcher.h"
11 #include "mojo/public/cpp/system/string_data_source.h"
12 
13 namespace net_service {
14 
15 namespace {
16 
17 // Match the default |capacity_num_bytes| value from mojo::Core::CreateDataPipe.
18 static const size_t kBufferSize = 64 * 1024;  // 64 Kbytes.
19 static const size_t kMinBufferSpace = 1024;   // 1 Kbytes.
20 
21 class ResponseFilterWrapper {
22  public:
23   ResponseFilterWrapper(CefRefPtr<CefResponseFilter> filter,
24                         mojo::ScopedDataPipeConsumerHandle source_handle,
25                         base::OnceClosure error_callback);
26 
27   ResponseFilterWrapper(const ResponseFilterWrapper&) = delete;
28   ResponseFilterWrapper& operator=(const ResponseFilterWrapper&) = delete;
29 
30   // Creates and returns the output handle, or |source_handle| on failure.
31   bool CreateOutputHandle(mojo::ScopedDataPipeConsumerHandle* output_handle);
32 
33  private:
34   void OnSourceReadable(MojoResult, const mojo::HandleSignalsState&);
35   void Filter(const char* data, size_t size);
36   void Write(std::unique_ptr<std::string> data);
37   void OnWriteComplete(std::unique_ptr<std::string>, MojoResult result);
38   void Drain(bool complete);
39   void MaybeSuccess();
40   void Cleanup(bool success);
41 
42   CefRefPtr<CefResponseFilter> filter_;
43   mojo::ScopedDataPipeConsumerHandle source_handle_;
44   base::OnceClosure error_callback_;
45 
46   std::unique_ptr<mojo::DataPipeProducer> forwarder_;
47   mojo::SimpleWatcher source_watcher_;
48 
49   bool read_pending_ = false;
50   bool write_pending_ = false;
51   std::queue<std::unique_ptr<std::string>> pending_data_;
52   cef_response_filter_status_t last_status_ = RESPONSE_FILTER_NEED_MORE_DATA;
53 };
54 
ResponseFilterWrapper(CefRefPtr<CefResponseFilter> filter,mojo::ScopedDataPipeConsumerHandle source_handle,base::OnceClosure error_callback)55 ResponseFilterWrapper::ResponseFilterWrapper(
56     CefRefPtr<CefResponseFilter> filter,
57     mojo::ScopedDataPipeConsumerHandle source_handle,
58     base::OnceClosure error_callback)
59     : filter_(filter),
60       source_handle_(std::move(source_handle)),
61       error_callback_(std::move(error_callback)),
62       source_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL) {}
63 
CreateOutputHandle(mojo::ScopedDataPipeConsumerHandle * output_handle)64 bool ResponseFilterWrapper::CreateOutputHandle(
65     mojo::ScopedDataPipeConsumerHandle* output_handle) {
66   if (!filter_->InitFilter()) {
67     *output_handle = std::move(source_handle_);
68     return false;
69   }
70 
71   mojo::ScopedDataPipeProducerHandle forwarding_handle;
72   if (CreateDataPipe(nullptr, forwarding_handle, *output_handle) !=
73       MOJO_RESULT_OK) {
74     *output_handle = std::move(source_handle_);
75     return false;
76   }
77 
78   forwarder_ =
79       std::make_unique<mojo::DataPipeProducer>(std::move(forwarding_handle));
80 
81   source_watcher_.Watch(
82       source_handle_.get(),
83       MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
84       MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
85       base::BindRepeating(&ResponseFilterWrapper::OnSourceReadable,
86                           base::Unretained(this)));
87   source_watcher_.ArmOrNotify();
88   read_pending_ = true;
89 
90   return true;
91 }
92 
OnSourceReadable(MojoResult,const mojo::HandleSignalsState &)93 void ResponseFilterWrapper::OnSourceReadable(MojoResult,
94                                              const mojo::HandleSignalsState&) {
95   const void* buffer = nullptr;
96   uint32_t read_bytes = 0;
97   MojoResult result = source_handle_->BeginReadData(&buffer, &read_bytes,
98                                                     MOJO_READ_DATA_FLAG_NONE);
99   if (result == MOJO_RESULT_SHOULD_WAIT) {
100     source_watcher_.ArmOrNotify();
101     return;
102   }
103 
104   if (result != MOJO_RESULT_OK) {
105     // Whole body has been read, or something went wrong.
106     Drain(result == MOJO_RESULT_FAILED_PRECONDITION);
107     return;
108   }
109 
110   Filter(static_cast<const char*>(buffer), read_bytes);
111   if (last_status_ == RESPONSE_FILTER_ERROR) {
112     // Something went wrong.
113     Drain(false);
114     return;
115   }
116 
117   source_handle_->EndReadData(read_bytes);
118   source_watcher_.ArmOrNotify();
119 }
120 
Filter(const char * data,size_t size)121 void ResponseFilterWrapper::Filter(const char* data, size_t size) {
122   size_t data_in_size = size;
123   auto data_in_ptr = data_in_size > 0 ? data : nullptr;
124 
125   size_t data_out_offset = 0;
126   std::unique_ptr<std::string> data_out;
127 
128   while (true) {
129     size_t data_in_read = 0;
130 
131     if (!data_out) {
132       // Start a new buffer. Should have no offset to begin with.
133       DCHECK_EQ(0U, data_out_offset);
134       data_out = std::make_unique<std::string>();
135       data_out->resize(kBufferSize);
136     }
137 
138     auto data_out_ptr = data_out->data() + data_out_offset;
139     size_t data_out_size = kBufferSize - data_out_offset;
140     size_t data_out_written = 0;
141 
142     last_status_ = filter_->Filter(
143         const_cast<char*>(data_in_ptr), data_in_size, data_in_read,
144         const_cast<char*>(data_out_ptr), data_out_size, data_out_written);
145     if (last_status_ == RESPONSE_FILTER_ERROR)
146       break;
147 
148     // Validate the out values.
149     if (data_in_read > data_in_size) {
150       LOG(ERROR) << "potential buffer overflow; data_in_read > data_in_size";
151       last_status_ = RESPONSE_FILTER_ERROR;
152       break;
153     }
154     if (data_out_written > data_out_size) {
155       LOG(ERROR)
156           << "potential buffer overflow; data_out_written > data_out_size";
157       last_status_ = RESPONSE_FILTER_ERROR;
158       break;
159     }
160     if (data_out_written == 0 && data_in_read != data_in_size) {
161       LOG(ERROR) << "when no data is written all input must be consumed; "
162                     "data_out_written == 0 && data_in_read != data_in_size";
163       last_status_ = RESPONSE_FILTER_ERROR;
164       break;
165     }
166 
167     if (data_out_written > 0) {
168       data_out_offset += data_out_written;
169       if (data_out_offset > kBufferSize - kMinBufferSpace) {
170         // The buffer is full or almost full. Write the data that we've
171         // received so far and start a new buffer.
172         data_out->resize(data_out_offset);
173         Write(std::move(data_out));
174         data_out_offset = 0;
175       }
176     }
177 
178     if (data_in_read < data_in_size) {
179       // Keep going until the user reads all data.
180       data_in_ptr += data_in_read;
181       data_in_size -= data_in_read;
182       continue;
183     }
184 
185     // At this point the user has read all data...
186     if (data_in_ptr) {
187       // Clear the input buffer.
188       data_in_read = data_in_size = 0;
189       data_in_ptr = nullptr;
190     }
191 
192     if (data_out_written == data_out_size &&
193         last_status_ == RESPONSE_FILTER_NEED_MORE_DATA) {
194       // Output buffer was filled, but data is still pending.
195       continue;
196     }
197 
198     if (data_out_offset > 0) {
199       // Write the last of the data that we've received.
200       data_out->resize(data_out_offset);
201       Write(std::move(data_out));
202     }
203 
204     break;
205   }
206 }
207 
Write(std::unique_ptr<std::string> data)208 void ResponseFilterWrapper::Write(std::unique_ptr<std::string> data) {
209   if (write_pending_) {
210     // Only one write at a time is supported.
211     pending_data_.push(std::move(data));
212     return;
213   }
214 
215   write_pending_ = true;
216 
217   base::StringPiece string_piece(*data);
218   forwarder_->Write(std::make_unique<mojo::StringDataSource>(
219                         string_piece, mojo::StringDataSource::AsyncWritingMode::
220                                           STRING_STAYS_VALID_UNTIL_COMPLETION),
221                     base::BindOnce(&ResponseFilterWrapper::OnWriteComplete,
222                                    base::Unretained(this), std::move(data)));
223 }
224 
OnWriteComplete(std::unique_ptr<std::string>,MojoResult result)225 void ResponseFilterWrapper::OnWriteComplete(std::unique_ptr<std::string>,
226                                             MojoResult result) {
227   write_pending_ = false;
228 
229   if (result != MOJO_RESULT_OK) {
230     // Something went wrong.
231     Cleanup(false);
232     return;
233   }
234 
235   MaybeSuccess();
236 }
237 
Drain(bool complete)238 void ResponseFilterWrapper::Drain(bool complete) {
239   read_pending_ = false;
240   source_handle_.reset();
241   source_watcher_.Cancel();
242 
243   if (!complete) {
244     // Something went wrong.
245     Cleanup(false);
246     return;
247   }
248 
249   if (last_status_ == RESPONSE_FILTER_NEED_MORE_DATA) {
250     // Let the user write any remaining data.
251     Filter(nullptr, 0);
252     if (last_status_ != RESPONSE_FILTER_DONE) {
253       // Something went wrong.
254       Cleanup(false);
255       return;
256     }
257   }
258 
259   MaybeSuccess();
260 }
261 
MaybeSuccess()262 void ResponseFilterWrapper::MaybeSuccess() {
263   if (!write_pending_ && !pending_data_.empty()) {
264     // Write the next data segment.
265     auto next = std::move(pending_data_.front());
266     pending_data_.pop();
267     Write(std::move(next));
268     return;
269   }
270 
271   if (!read_pending_ && !write_pending_)
272     Cleanup(true);
273 }
274 
Cleanup(bool success)275 void ResponseFilterWrapper::Cleanup(bool success) {
276   if (!success && error_callback_)
277     std::move(error_callback_).Run();
278   delete this;
279 }
280 
281 }  // namespace
282 
CreateResponseFilterHandler(CefRefPtr<CefResponseFilter> filter,mojo::ScopedDataPipeConsumerHandle source_handle,base::OnceClosure error_callback)283 mojo::ScopedDataPipeConsumerHandle CreateResponseFilterHandler(
284     CefRefPtr<CefResponseFilter> filter,
285     mojo::ScopedDataPipeConsumerHandle source_handle,
286     base::OnceClosure error_callback) {
287   // |filter_wrapper| will delete itself when filtering is complete if
288   // CreateOutputHandle returns true. Otherwise, it will return the
289   // original |source_handle|.
290   auto filter_wrapper = new ResponseFilterWrapper(
291       filter, std::move(source_handle), std::move(error_callback));
292   mojo::ScopedDataPipeConsumerHandle output_handle;
293   if (!filter_wrapper->CreateOutputHandle(&output_handle))
294     delete filter_wrapper;
295   return output_handle;
296 }
297 
298 }  // namespace net_service
299