• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2019 The Chromium Embedded Framework Authors. Portions
2 // Copyright (c) 2018 The Chromium Authors. All rights reserved. Use of this
3 // source code is governed by a BSD-style license that can be found in the
4 // LICENSE file.
5 
6 #include "libcef/browser/net_service/stream_reader_url_loader.h"
7 
8 #include "libcef/browser/thread_util.h"
9 #include "libcef/common/net_service/net_service_util.h"
10 
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/strings/string_util.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/task/post_task.h"
17 #include "base/task/thread_pool.h"
18 #include "base/threading/thread.h"
19 #include "base/threading/thread_task_runner_handle.h"
20 #include "content/public/browser/browser_thread.h"
21 #include "net/base/io_buffer.h"
22 #include "net/http/http_status_code.h"
23 #include "net/http/http_util.h"
24 #include "services/network/public/cpp/url_loader_completion_status.h"
25 
26 namespace net_service {
27 
28 namespace {
29 
30 using OnInputStreamOpenedCallback =
31     base::OnceCallback<void(std::unique_ptr<StreamReaderURLLoader::Delegate>,
32                             std::unique_ptr<InputStream>)>;
33 
34 // Helper for executing the OnInputStreamOpenedCallback.
35 class OpenInputStreamWrapper
36     : public base::RefCountedThreadSafe<OpenInputStreamWrapper> {
37  public:
38   OpenInputStreamWrapper(const OpenInputStreamWrapper&) = delete;
39   OpenInputStreamWrapper& operator=(const OpenInputStreamWrapper&) = delete;
40 
Open(std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,int32_t request_id,const network::ResourceRequest & request,OnInputStreamOpenedCallback callback)41   static base::OnceClosure Open(
42       std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
43       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
44       int32_t request_id,
45       const network::ResourceRequest& request,
46       OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT {
47     scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper(
48         std::move(delegate), work_thread_task_runner,
49         base::ThreadTaskRunnerHandle::Get(), std::move(callback));
50     wrapper->Start(request_id, request);
51 
52     return wrapper->GetCancelCallback();
53   }
54 
55  private:
56   friend class base::RefCountedThreadSafe<OpenInputStreamWrapper>;
57 
OpenInputStreamWrapper(std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,OnInputStreamOpenedCallback callback)58   OpenInputStreamWrapper(
59       std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
60       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
61       scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,
62       OnInputStreamOpenedCallback callback)
63       : delegate_(std::move(delegate)),
64         work_thread_task_runner_(work_thread_task_runner),
65         job_thread_task_runner_(job_thread_task_runner),
66         callback_(std::move(callback)) {}
67   virtual ~OpenInputStreamWrapper() = default;
68 
Start(int32_t request_id,const network::ResourceRequest & request)69   void Start(int32_t request_id, const network::ResourceRequest& request) {
70     work_thread_task_runner_->PostTask(
71         FROM_HERE,
72         base::BindOnce(&OpenInputStreamWrapper::OpenOnWorkThread,
73                        base::WrapRefCounted(this), request_id, request));
74   }
75 
GetCancelCallback()76   base::OnceClosure GetCancelCallback() {
77     return base::BindOnce(&OpenInputStreamWrapper::CancelOnJobThread,
78                           base::WrapRefCounted(this));
79   }
80 
CancelOnJobThread()81   void CancelOnJobThread() {
82     DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence());
83     if (callback_.is_null())
84       return;
85 
86     callback_.Reset();
87 
88     work_thread_task_runner_->PostTask(
89         FROM_HERE, base::BindOnce(&OpenInputStreamWrapper::CancelOnWorkThread,
90                                   base::WrapRefCounted(this)));
91   }
92 
CancelOnWorkThread()93   void CancelOnWorkThread() {
94     DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
95     if (is_canceled_)
96       return;
97     is_canceled_ = true;
98     OnCallback(nullptr);
99   }
100 
OpenOnWorkThread(int32_t request_id,const network::ResourceRequest & request)101   void OpenOnWorkThread(int32_t request_id,
102                         const network::ResourceRequest& request) {
103     DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
104     if (is_canceled_)
105       return;
106 
107     // |delegate_| will remain valid until OnCallback() is executed on
108     // |job_thread_task_runner_|.
109     if (!delegate_->OpenInputStream(
110             request_id, request,
111             base::BindOnce(&OpenInputStreamWrapper::OnCallback,
112                            base::WrapRefCounted(this)))) {
113       is_canceled_ = true;
114       OnCallback(nullptr);
115     }
116   }
117 
OnCallback(std::unique_ptr<InputStream> input_stream)118   void OnCallback(std::unique_ptr<InputStream> input_stream) {
119     if (!job_thread_task_runner_->RunsTasksInCurrentSequence()) {
120       job_thread_task_runner_->PostTask(
121           FROM_HERE,
122           base::BindOnce(&OpenInputStreamWrapper::OnCallback,
123                          base::WrapRefCounted(this), std::move(input_stream)));
124       return;
125     }
126 
127     // May be null if CancelOnJobThread() was called on
128     // |job_thread_task_runner_| while OpenOnWorkThread() was pending on
129     // |work_thread_task_runner_|.
130     if (callback_.is_null()) {
131       delegate_.reset();
132       return;
133     }
134 
135     std::move(callback_).Run(std::move(delegate_), std::move(input_stream));
136   }
137 
138   std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_;
139 
140   scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
141   scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
142 
143   // Only accessed on |job_thread_task_runner_|.
144   OnInputStreamOpenedCallback callback_;
145 
146   // Only accessed on |work_thread_task_runner_|.
147   bool is_canceled_ = false;
148 };
149 
150 }  // namespace
151 
152 //==============================
153 // InputStreamReader
154 //=============================
155 
156 // Class responsible for reading from the InputStream.
157 class InputStreamReader : public base::RefCountedThreadSafe<InputStreamReader> {
158  public:
159   // The constructor is called on the IO thread, not on the worker thread.
160   // Callbacks will be executed on the IO thread.
161   InputStreamReader(
162       std::unique_ptr<InputStream> stream,
163       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner);
164 
165   InputStreamReader(const InputStreamReader&) = delete;
166   InputStreamReader& operator=(const InputStreamReader&) = delete;
167 
168   // Skip |skip_bytes| number of bytes from |stream_|. |callback| will be
169   // executed asynchronously on the IO thread. A negative value passed to
170   // |callback| will indicate an error code, a positive value will indicate the
171   // number of bytes skipped.
172   void Skip(int64_t skip_bytes, InputStream::SkipCallback callback);
173 
174   // Read up to |dest_size| bytes from |stream_| into |dest|. |callback| will be
175   // executed asynchronously on the IO thread. A negative value passed to
176   // |callback| will indicate an error code, a positive value will indicate the
177   // number of bytes read.
178   void Read(scoped_refptr<net::IOBuffer> dest,
179             int dest_size,
180             InputStream::ReadCallback callback);
181 
182  private:
183   friend class base::RefCountedThreadSafe<InputStreamReader>;
184   virtual ~InputStreamReader();
185 
186   void SkipOnWorkThread(int64_t skip_bytes, InputStream::SkipCallback callback);
187   void ReadOnWorkThread(scoped_refptr<net::IOBuffer> buffer,
188                         int buffer_size,
189                         InputStream::ReadCallback callback);
190 
191   void SkipToRequestedRange();
192 
193   static void ContinueSkipCallback(
194       scoped_refptr<InputStreamReader> stream,
195       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
196       int callback_id,
197       int64_t bytes_skipped);
198   static void ContinueReadCallback(
199       scoped_refptr<InputStreamReader> stream,
200       scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
201       int callback_id,
202       int bytes_read);
203 
204   void ContinueSkipCallbackOnWorkThread(int callback_id, int64_t bytes_skipped);
205   void ContinueReadCallbackOnWorkThread(int callback_id, int bytes_read);
206 
207   void RunSkipCallback(int64_t bytes_skipped);
208   void RunReadCallback(int bytes_read);
209 
210   static void RunSkipCallbackOnJobThread(
211       int64_t bytes_skipped,
212       InputStream::SkipCallback skip_callback);
213   static void RunReadCallbackOnJobThread(
214       int bytes_read,
215       InputStream::ReadCallback read_callback);
216 
217   std::unique_ptr<InputStream> stream_;
218 
219   // All InputStream methods are called this task runner.
220   scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
221 
222   // All callbacks are executed on this task runner.
223   scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
224 
225   // The below members are only accessed on the work thread.
226   int64_t bytes_skipped_;
227   int64_t bytes_to_skip_;
228   InputStream::SkipCallback pending_skip_callback_;
229 
230   scoped_refptr<net::IOBuffer> buffer_;
231   InputStream::ReadCallback pending_read_callback_;
232 
233   int pending_callback_id_ = -1;
234 
235   int next_callback_id_ = 0;
236 };
237 
InputStreamReader(std::unique_ptr<InputStream> stream,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner)238 InputStreamReader::InputStreamReader(
239     std::unique_ptr<InputStream> stream,
240     scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner)
241     : stream_(std::move(stream)),
242       work_thread_task_runner_(work_thread_task_runner),
243       job_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
244   CEF_REQUIRE_IOT();
245   DCHECK(stream_);
246   DCHECK(work_thread_task_runner_);
247 }
248 
~InputStreamReader()249 InputStreamReader::~InputStreamReader() {}
250 
Skip(int64_t skip_bytes,InputStream::SkipCallback callback)251 void InputStreamReader::Skip(int64_t skip_bytes,
252                              InputStream::SkipCallback callback) {
253   work_thread_task_runner_->PostTask(
254       FROM_HERE, base::BindOnce(&InputStreamReader::SkipOnWorkThread,
255                                 base::WrapRefCounted(this), skip_bytes,
256                                 std::move(callback)));
257 }
258 
Read(scoped_refptr<net::IOBuffer> dest,int dest_size,InputStream::ReadCallback callback)259 void InputStreamReader::Read(scoped_refptr<net::IOBuffer> dest,
260                              int dest_size,
261                              InputStream::ReadCallback callback) {
262   work_thread_task_runner_->PostTask(
263       FROM_HERE, base::BindOnce(&InputStreamReader::ReadOnWorkThread,
264                                 base::WrapRefCounted(this), dest, dest_size,
265                                 std::move(callback)));
266 }
267 
SkipOnWorkThread(int64_t skip_bytes,InputStream::SkipCallback callback)268 void InputStreamReader::SkipOnWorkThread(int64_t skip_bytes,
269                                          InputStream::SkipCallback callback) {
270   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
271 
272   // No callback should currently be pending.
273   DCHECK_EQ(pending_callback_id_, -1);
274   DCHECK(pending_skip_callback_.is_null());
275 
276   pending_skip_callback_ = std::move(callback);
277 
278   if (skip_bytes <= 0) {
279     RunSkipCallback(0);
280     return;
281   }
282 
283   bytes_skipped_ = bytes_to_skip_ = skip_bytes;
284   SkipToRequestedRange();
285 }
286 
ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest,int dest_size,InputStream::ReadCallback callback)287 void InputStreamReader::ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest,
288                                          int dest_size,
289                                          InputStream::ReadCallback callback) {
290   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
291 
292   // No callback should currently be pending.
293   DCHECK_EQ(pending_callback_id_, -1);
294   DCHECK(pending_read_callback_.is_null());
295 
296   pending_read_callback_ = std::move(callback);
297 
298   if (!dest_size) {
299     RunReadCallback(0);
300     return;
301   }
302 
303   DCHECK_GT(dest_size, 0);
304 
305   buffer_ = dest;
306   pending_callback_id_ = ++next_callback_id_;
307 
308   int bytes_read = 0;
309   bool result = stream_->Read(
310       buffer_.get(), dest_size, &bytes_read,
311       base::BindOnce(&InputStreamReader::ContinueReadCallback,
312                      base::WrapRefCounted(this), work_thread_task_runner_,
313                      pending_callback_id_));
314 
315   // Check if the callback will execute asynchronously.
316   if (result && bytes_read == 0)
317     return;
318 
319   RunReadCallback(result || bytes_read <= 0 ? bytes_read : net::ERR_FAILED);
320 }
321 
SkipToRequestedRange()322 void InputStreamReader::SkipToRequestedRange() {
323   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
324 
325   // Skip to the start of the requested data. This has to be done in a loop
326   // because the underlying InputStream is not guaranteed to skip the requested
327   // number of bytes.
328   do {
329     pending_callback_id_ = ++next_callback_id_;
330 
331     int64_t skipped = 0;
332     bool result = stream_->Skip(
333         bytes_to_skip_, &skipped,
334         base::BindOnce(&InputStreamReader::ContinueSkipCallback,
335                        base::WrapRefCounted(this), work_thread_task_runner_,
336                        pending_callback_id_));
337 
338     // Check if the callback will execute asynchronously.
339     if (result && skipped == 0)
340       return;
341 
342     if (!result || skipped <= 0) {
343       RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
344       return;
345     }
346     DCHECK_LE(skipped, bytes_to_skip_);
347 
348     bytes_to_skip_ -= skipped;
349   } while (bytes_to_skip_ > 0);
350 
351   // All done, the requested number of bytes were skipped.
352   RunSkipCallback(bytes_skipped_);
353 }
354 
355 // static
ContinueSkipCallback(scoped_refptr<InputStreamReader> stream,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,int callback_id,int64_t bytes_skipped)356 void InputStreamReader::ContinueSkipCallback(
357     scoped_refptr<InputStreamReader> stream,
358     scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
359     int callback_id,
360     int64_t bytes_skipped) {
361   // Always execute asynchronously.
362   work_thread_task_runner->PostTask(
363       FROM_HERE,
364       base::BindOnce(&InputStreamReader::ContinueSkipCallbackOnWorkThread,
365                      stream, callback_id, bytes_skipped));
366 }
367 
368 // static
ContinueReadCallback(scoped_refptr<InputStreamReader> stream,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,int callback_id,int bytes_read)369 void InputStreamReader::ContinueReadCallback(
370     scoped_refptr<InputStreamReader> stream,
371     scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
372     int callback_id,
373     int bytes_read) {
374   // Always execute asynchronously.
375   work_thread_task_runner->PostTask(
376       FROM_HERE,
377       base::BindOnce(&InputStreamReader::ContinueReadCallbackOnWorkThread,
378                      stream, callback_id, bytes_read));
379 }
380 
ContinueSkipCallbackOnWorkThread(int callback_id,int64_t bytes_skipped)381 void InputStreamReader::ContinueSkipCallbackOnWorkThread(
382     int callback_id,
383     int64_t bytes_skipped) {
384   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
385 
386   // Check for out of order callbacks.
387   if (pending_callback_id_ != callback_id)
388     return;
389 
390   DCHECK_LE(bytes_skipped, bytes_to_skip_);
391 
392   if (bytes_to_skip_ > 0 && bytes_skipped > 0)
393     bytes_to_skip_ -= bytes_skipped;
394 
395   if (bytes_skipped <= 0) {
396     RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
397   } else if (bytes_to_skip_ > 0) {
398     // Continue execution asynchronously.
399     work_thread_task_runner_->PostTask(
400         FROM_HERE,
401         base::BindOnce(&InputStreamReader::SkipToRequestedRange, this));
402   } else {
403     // All done, the requested number of bytes were skipped.
404     RunSkipCallback(bytes_skipped_);
405   }
406 }
407 
ContinueReadCallbackOnWorkThread(int callback_id,int bytes_read)408 void InputStreamReader::ContinueReadCallbackOnWorkThread(int callback_id,
409                                                          int bytes_read) {
410   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
411 
412   // Check for out of order callbacks.
413   if (pending_callback_id_ != callback_id)
414     return;
415 
416   RunReadCallback(bytes_read);
417 }
418 
RunSkipCallback(int64_t bytes_skipped)419 void InputStreamReader::RunSkipCallback(int64_t bytes_skipped) {
420   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
421 
422   DCHECK(!pending_skip_callback_.is_null());
423   job_thread_task_runner_->PostTask(
424       FROM_HERE,
425       base::BindOnce(InputStreamReader::RunSkipCallbackOnJobThread,
426                      bytes_skipped, std::move(pending_skip_callback_)));
427 
428   // Reset callback state.
429   pending_callback_id_ = -1;
430   bytes_skipped_ = bytes_to_skip_ = -1;
431 }
432 
RunReadCallback(int bytes_read)433 void InputStreamReader::RunReadCallback(int bytes_read) {
434   DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
435 
436   DCHECK(!pending_read_callback_.is_null());
437   job_thread_task_runner_->PostTask(
438       FROM_HERE, base::BindOnce(InputStreamReader::RunReadCallbackOnJobThread,
439                                 bytes_read, std::move(pending_read_callback_)));
440 
441   // Reset callback state.
442   pending_callback_id_ = -1;
443   buffer_ = nullptr;
444 }
445 
446 // static
RunSkipCallbackOnJobThread(int64_t bytes_skipped,InputStream::SkipCallback skip_callback)447 void InputStreamReader::RunSkipCallbackOnJobThread(
448     int64_t bytes_skipped,
449     InputStream::SkipCallback skip_callback) {
450   std::move(skip_callback).Run(bytes_skipped);
451 }
452 
453 // static
RunReadCallbackOnJobThread(int bytes_read,InputStream::ReadCallback read_callback)454 void InputStreamReader::RunReadCallbackOnJobThread(
455     int bytes_read,
456     InputStream::ReadCallback read_callback) {
457   std::move(read_callback).Run(bytes_read);
458 }
459 
460 //==============================
461 // StreamReaderURLLoader
462 //=============================
463 
StreamReaderURLLoader(int32_t request_id,const network::ResourceRequest & request,mojo::PendingRemote<network::mojom::URLLoaderClient> client,mojo::PendingRemote<network::mojom::TrustedHeaderClient> header_client,const net::MutableNetworkTrafficAnnotationTag & traffic_annotation,std::unique_ptr<Delegate> response_delegate)464 StreamReaderURLLoader::StreamReaderURLLoader(
465     int32_t request_id,
466     const network::ResourceRequest& request,
467     mojo::PendingRemote<network::mojom::URLLoaderClient> client,
468     mojo::PendingRemote<network::mojom::TrustedHeaderClient> header_client,
469     const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
470     std::unique_ptr<Delegate> response_delegate)
471     : request_id_(request_id),
472       request_(request),
473       client_(std::move(client)),
474       header_client_(std::move(header_client)),
475       traffic_annotation_(traffic_annotation),
476       response_delegate_(std::move(response_delegate)),
477       writable_handle_watcher_(FROM_HERE,
478                                mojo::SimpleWatcher::ArmingPolicy::MANUAL,
479                                base::SequencedTaskRunnerHandle::Get()),
480       weak_factory_(this) {
481   DCHECK(response_delegate_);
482   // If there is a client error, clean up the request.
483   client_.set_disconnect_handler(
484       base::BindOnce(&StreamReaderURLLoader::RequestComplete,
485                      weak_factory_.GetWeakPtr(), net::ERR_ABORTED));
486 
487   // All InputStream work will be performed on this task runner.
488   stream_work_task_runner_ =
489       base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock()});
490 }
491 
~StreamReaderURLLoader()492 StreamReaderURLLoader::~StreamReaderURLLoader() {
493   if (open_cancel_callback_) {
494     // Release the Delegate held by OpenInputStreamWrapper.
495     std::move(open_cancel_callback_).Run();
496   }
497 }
498 
Start()499 void StreamReaderURLLoader::Start() {
500   DCHECK(thread_checker_.CalledOnValidThread());
501 
502   if (!ParseRange(request_.headers)) {
503     RequestComplete(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
504     return;
505   }
506 
507   if (header_client_.is_bound()) {
508     header_client_->OnBeforeSendHeaders(
509         request_.headers,
510         base::BindOnce(&StreamReaderURLLoader::ContinueWithRequestHeaders,
511                        weak_factory_.GetWeakPtr()));
512   } else {
513     ContinueWithRequestHeaders(net::OK, absl::nullopt);
514   }
515 }
516 
ContinueWithRequestHeaders(int32_t result,const absl::optional<net::HttpRequestHeaders> & headers)517 void StreamReaderURLLoader::ContinueWithRequestHeaders(
518     int32_t result,
519     const absl::optional<net::HttpRequestHeaders>& headers) {
520   if (result != net::OK) {
521     RequestComplete(result);
522     return;
523   }
524 
525   if (headers) {
526     DCHECK(header_client_.is_bound());
527     request_.headers = *headers;
528   }
529 
530   open_cancel_callback_ = OpenInputStreamWrapper::Open(
531       // This is intentional - the loader could be deleted while
532       // the callback is executing on the background thread. The
533       // delegate will be "returned" to the loader once the
534       // InputStream open attempt is completed.
535       std::move(response_delegate_), stream_work_task_runner_, request_id_,
536       request_,
537       base::BindOnce(&StreamReaderURLLoader::OnInputStreamOpened,
538                      weak_factory_.GetWeakPtr()));
539 }
540 
FollowRedirect(const std::vector<std::string> & removed_headers,const net::HttpRequestHeaders & modified_headers,const net::HttpRequestHeaders & modified_cors_exempt_headers,const absl::optional<GURL> & new_url)541 void StreamReaderURLLoader::FollowRedirect(
542     const std::vector<std::string>& removed_headers,
543     const net::HttpRequestHeaders& modified_headers,
544     const net::HttpRequestHeaders& modified_cors_exempt_headers,
545     const absl::optional<GURL>& new_url) {
546   NOTREACHED();
547 }
548 
SetPriority(net::RequestPriority priority,int intra_priority_value)549 void StreamReaderURLLoader::SetPriority(net::RequestPriority priority,
550                                         int intra_priority_value) {}
551 
PauseReadingBodyFromNet()552 void StreamReaderURLLoader::PauseReadingBodyFromNet() {}
553 
ResumeReadingBodyFromNet()554 void StreamReaderURLLoader::ResumeReadingBodyFromNet() {}
555 
OnInputStreamOpened(std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate,std::unique_ptr<InputStream> input_stream)556 void StreamReaderURLLoader::OnInputStreamOpened(
557     std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate,
558     std::unique_ptr<InputStream> input_stream) {
559   DCHECK(thread_checker_.CalledOnValidThread());
560   DCHECK(returned_delegate);
561   response_delegate_ = std::move(returned_delegate);
562   open_cancel_callback_.Reset();
563 
564   if (!input_stream) {
565     bool restarted = false;
566     response_delegate_->OnInputStreamOpenFailed(request_id_, &restarted);
567     if (restarted) {
568       // The request has been restarted with a new loader.
569       // |this| will be deleted.
570       CleanUp();
571     } else {
572       HeadersComplete(net::HTTP_NOT_FOUND, -1);
573     }
574     return;
575   }
576 
577   input_stream_reader_ = base::MakeRefCounted<InputStreamReader>(
578       std::move(input_stream), stream_work_task_runner_);
579 
580   if (!byte_range_valid()) {
581     OnReaderSkipCompleted(0);
582   } else {
583     input_stream_reader_->Skip(
584         byte_range_.first_byte_position(),
585         base::BindOnce(&StreamReaderURLLoader::OnReaderSkipCompleted,
586                        weak_factory_.GetWeakPtr()));
587   }
588 }
589 
OnReaderSkipCompleted(int64_t bytes_skipped)590 void StreamReaderURLLoader::OnReaderSkipCompleted(int64_t bytes_skipped) {
591   DCHECK(thread_checker_.CalledOnValidThread());
592 
593   if (!byte_range_valid()) {
594     // Expected content length is unspecified.
595     HeadersComplete(net::HTTP_OK, -1);
596   } else if (bytes_skipped == byte_range_.first_byte_position()) {
597     // We skipped the expected number of bytes.
598     int64_t expected_content_length = -1;
599     if (byte_range_.HasLastBytePosition()) {
600       expected_content_length = byte_range_.last_byte_position() -
601                                 byte_range_.first_byte_position() + 1;
602       DCHECK_GE(expected_content_length, 0);
603     }
604     HeadersComplete(net::HTTP_OK, expected_content_length);
605   } else {
606     RequestComplete(bytes_skipped < 0 ? bytes_skipped : net::ERR_FAILED);
607   }
608 }
609 
HeadersComplete(int orig_status_code,int64_t expected_content_length)610 void StreamReaderURLLoader::HeadersComplete(int orig_status_code,
611                                             int64_t expected_content_length) {
612   DCHECK(thread_checker_.CalledOnValidThread());
613 
614   int status_code = orig_status_code;
615   std::string status_text =
616       net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code));
617   std::string mime_type, charset;
618   int64_t content_length = expected_content_length;
619   ResourceResponse::HeaderMap extra_headers;
620   response_delegate_->GetResponseHeaders(request_id_, &status_code,
621                                          &status_text, &mime_type, &charset,
622                                          &content_length, &extra_headers);
623 
624   if (status_code < 0) {
625     // Early exit if the handler reported an error.
626     RequestComplete(status_code);
627     return;
628   }
629 
630   auto pending_response = network::mojom::URLResponseHead::New();
631   pending_response->request_start = base::TimeTicks::Now();
632   pending_response->response_start = base::TimeTicks::Now();
633 
634   auto headers = MakeResponseHeaders(
635       status_code, status_text, mime_type, charset, content_length,
636       extra_headers, false /* allow_existing_header_override */);
637   pending_response->headers = headers;
638 
639   if (content_length >= 0)
640     pending_response->content_length = content_length;
641 
642   if (!mime_type.empty()) {
643     pending_response->mime_type = mime_type;
644     if (!charset.empty())
645       pending_response->charset = charset;
646   }
647 
648   if (header_client_.is_bound()) {
649     header_client_->OnHeadersReceived(
650         headers->raw_headers(), net::IPEndPoint(),
651         base::BindOnce(&StreamReaderURLLoader::ContinueWithResponseHeaders,
652                        weak_factory_.GetWeakPtr(),
653                        std::move(pending_response)));
654   } else {
655     ContinueWithResponseHeaders(std::move(pending_response), net::OK,
656                                 absl::nullopt, absl::nullopt);
657   }
658 }
659 
ContinueWithResponseHeaders(network::mojom::URLResponseHeadPtr pending_response,int32_t result,const absl::optional<std::string> & headers,const absl::optional<GURL> & redirect_url)660 void StreamReaderURLLoader::ContinueWithResponseHeaders(
661     network::mojom::URLResponseHeadPtr pending_response,
662     int32_t result,
663     const absl::optional<std::string>& headers,
664     const absl::optional<GURL>& redirect_url) {
665   if (result != net::OK) {
666     RequestComplete(result);
667     return;
668   }
669 
670   if (headers) {
671     DCHECK(header_client_.is_bound());
672     pending_response->headers =
673         base::MakeRefCounted<net::HttpResponseHeaders>(*headers);
674   }
675 
676   auto pending_headers = pending_response->headers;
677 
678   // What the length would be if we sent headers over the network. Used to
679   // calculate data length.
680   header_length_ = pending_headers->raw_headers().length();
681 
682   DCHECK(client_.is_bound());
683 
684   std::string location;
685   const auto has_redirect_url = redirect_url && !redirect_url->is_empty();
686   if (has_redirect_url || pending_headers->IsRedirect(&location)) {
687     pending_response->encoded_data_length = header_length_;
688     pending_response->content_length = pending_response->encoded_body_length =
689         0;
690     const GURL new_location =
691         has_redirect_url ? *redirect_url : request_.url.Resolve(location);
692     client_->OnReceiveRedirect(
693         MakeRedirectInfo(request_, pending_headers.get(), new_location,
694                          pending_headers->response_code()),
695         std::move(pending_response));
696     // The client will restart the request with a new loader.
697     // |this| will be deleted.
698     CleanUp();
699   } else {
700     client_->OnReceiveResponse(std::move(pending_response),
701                                mojo::ScopedDataPipeConsumerHandle());
702   }
703 }
704 
ContinueResponse(bool was_redirected)705 void StreamReaderURLLoader::ContinueResponse(bool was_redirected) {
706   DCHECK(thread_checker_.CalledOnValidThread());
707 
708   if (was_redirected) {
709     // Special case where we allow the client to perform the redirect.
710     // The client will restart the request with a new loader.
711     // |this| will be deleted.
712     CleanUp();
713   } else {
714     SendBody();
715   }
716 }
717 
SendBody()718 void StreamReaderURLLoader::SendBody() {
719   DCHECK(thread_checker_.CalledOnValidThread());
720 
721   mojo::ScopedDataPipeConsumerHandle consumer_handle;
722   if (CreateDataPipe(nullptr /*options*/, producer_handle_, consumer_handle) !=
723       MOJO_RESULT_OK) {
724     RequestComplete(net::ERR_FAILED);
725     return;
726   }
727   writable_handle_watcher_.Watch(
728       producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
729       base::BindRepeating(&StreamReaderURLLoader::OnDataPipeWritable,
730                           base::Unretained(this)));
731   client_->OnStartLoadingResponseBody(std::move(consumer_handle));
732 
733   ReadMore();
734 }
735 
ReadMore()736 void StreamReaderURLLoader::ReadMore() {
737   DCHECK(thread_checker_.CalledOnValidThread());
738   DCHECK(!pending_buffer_.get());
739 
740   uint32_t num_bytes;
741   MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite(
742       &producer_handle_, &pending_buffer_, &num_bytes);
743   if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
744     // The pipe is full. We need to wait for it to have more space.
745     writable_handle_watcher_.ArmOrNotify();
746     return;
747   } else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) {
748     // The data pipe consumer handle has been closed.
749     RequestComplete(net::ERR_ABORTED);
750     return;
751   } else if (mojo_result != MOJO_RESULT_OK) {
752     // The body stream is in a bad state. Bail out.
753     RequestComplete(net::ERR_UNEXPECTED);
754     return;
755   }
756   scoped_refptr<net::IOBuffer> buffer(
757       new network::NetToMojoIOBuffer(pending_buffer_.get()));
758 
759   if (!input_stream_reader_.get()) {
760     // This will happen if opening the InputStream fails in which case the
761     // error is communicated by setting the HTTP response status header rather
762     // than failing the request during the header fetch phase.
763     OnReaderReadCompleted(0);
764     return;
765   }
766 
767   input_stream_reader_->Read(
768       buffer, base::checked_cast<int>(num_bytes),
769       base::BindOnce(&StreamReaderURLLoader::OnReaderReadCompleted,
770                      weak_factory_.GetWeakPtr()));
771 }
772 
OnDataPipeWritable(MojoResult result)773 void StreamReaderURLLoader::OnDataPipeWritable(MojoResult result) {
774   if (result == MOJO_RESULT_FAILED_PRECONDITION) {
775     RequestComplete(net::ERR_ABORTED);
776     return;
777   }
778   DCHECK_EQ(result, MOJO_RESULT_OK) << result;
779 
780   ReadMore();
781 }
782 
OnReaderReadCompleted(int bytes_read)783 void StreamReaderURLLoader::OnReaderReadCompleted(int bytes_read) {
784   DCHECK(thread_checker_.CalledOnValidThread());
785 
786   DCHECK(pending_buffer_);
787   if (bytes_read < 0) {
788     // Error case.
789     RequestComplete(bytes_read);
790     return;
791   }
792   if (bytes_read == 0) {
793     // Eof, read completed.
794     pending_buffer_->Complete(0);
795     RequestComplete(net::OK);
796     return;
797   }
798   producer_handle_ = pending_buffer_->Complete(bytes_read);
799   pending_buffer_ = nullptr;
800 
801   client_->OnTransferSizeUpdated(bytes_read);
802   total_bytes_read_ += bytes_read;
803 
804   base::ThreadTaskRunnerHandle::Get()->PostTask(
805       FROM_HERE, base::BindOnce(&StreamReaderURLLoader::ReadMore,
806                                 weak_factory_.GetWeakPtr()));
807 }
808 
RequestComplete(int status_code)809 void StreamReaderURLLoader::RequestComplete(int status_code) {
810   DCHECK(thread_checker_.CalledOnValidThread());
811 
812   auto status = network::URLLoaderCompletionStatus(status_code);
813   status.completion_time = base::TimeTicks::Now();
814   status.encoded_data_length = total_bytes_read_ + header_length_;
815   status.encoded_body_length = total_bytes_read_;
816   // We don't support decoders, so use the same value.
817   status.decoded_body_length = total_bytes_read_;
818 
819   client_->OnComplete(status);
820   CleanUp();
821 }
822 
CleanUp()823 void StreamReaderURLLoader::CleanUp() {
824   DCHECK(thread_checker_.CalledOnValidThread());
825 
826   // Resets the watchers and pipes, so that we will never be called back.
827   writable_handle_watcher_.Cancel();
828   pending_buffer_ = nullptr;
829   producer_handle_.reset();
830 
831   // Manages its own lifetime.
832   delete this;
833 }
834 
ParseRange(const net::HttpRequestHeaders & headers)835 bool StreamReaderURLLoader::ParseRange(const net::HttpRequestHeaders& headers) {
836   DCHECK(thread_checker_.CalledOnValidThread());
837 
838   std::string range_header;
839   if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) {
840     // This loader only cares about the Range header so that we know how many
841     // bytes in the stream to skip and how many to read after that.
842     std::vector<net::HttpByteRange> ranges;
843     if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) {
844       // In case of multi-range request only use the first range.
845       // We don't support multirange requests.
846       if (ranges.size() == 1)
847         byte_range_ = ranges[0];
848     } else {
849       // This happens if the range header could not be parsed or is invalid.
850       return false;
851     }
852   }
853   return true;
854 }
855 
byte_range_valid() const856 bool StreamReaderURLLoader::byte_range_valid() const {
857   return byte_range_.IsValid() && byte_range_.first_byte_position() >= 0;
858 }
859 
860 }  // namespace net_service