1 // Copyright (c) 2019 The Chromium Embedded Framework Authors. Portions
2 // Copyright (c) 2018 The Chromium Authors. All rights reserved. Use of this
3 // source code is governed by a BSD-style license that can be found in the
4 // LICENSE file.
5
6 #include "libcef/browser/net_service/stream_reader_url_loader.h"
7
8 #include "libcef/browser/thread_util.h"
9 #include "libcef/common/net_service/net_service_util.h"
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/strings/string_util.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/task/post_task.h"
17 #include "base/task/thread_pool.h"
18 #include "base/threading/thread.h"
19 #include "base/threading/thread_task_runner_handle.h"
20 #include "content/public/browser/browser_thread.h"
21 #include "net/base/io_buffer.h"
22 #include "net/http/http_status_code.h"
23 #include "net/http/http_util.h"
24 #include "services/network/public/cpp/url_loader_completion_status.h"
25
26 namespace net_service {
27
28 namespace {
29
30 using OnInputStreamOpenedCallback =
31 base::OnceCallback<void(std::unique_ptr<StreamReaderURLLoader::Delegate>,
32 std::unique_ptr<InputStream>)>;
33
34 // Helper for executing the OnInputStreamOpenedCallback.
35 class OpenInputStreamWrapper
36 : public base::RefCountedThreadSafe<OpenInputStreamWrapper> {
37 public:
38 OpenInputStreamWrapper(const OpenInputStreamWrapper&) = delete;
39 OpenInputStreamWrapper& operator=(const OpenInputStreamWrapper&) = delete;
40
Open(std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,int32_t request_id,const network::ResourceRequest & request,OnInputStreamOpenedCallback callback)41 static base::OnceClosure Open(
42 std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
43 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
44 int32_t request_id,
45 const network::ResourceRequest& request,
46 OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT {
47 scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper(
48 std::move(delegate), work_thread_task_runner,
49 base::ThreadTaskRunnerHandle::Get(), std::move(callback));
50 wrapper->Start(request_id, request);
51
52 return wrapper->GetCancelCallback();
53 }
54
55 private:
56 friend class base::RefCountedThreadSafe<OpenInputStreamWrapper>;
57
OpenInputStreamWrapper(std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,OnInputStreamOpenedCallback callback)58 OpenInputStreamWrapper(
59 std::unique_ptr<StreamReaderURLLoader::Delegate> delegate,
60 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
61 scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner,
62 OnInputStreamOpenedCallback callback)
63 : delegate_(std::move(delegate)),
64 work_thread_task_runner_(work_thread_task_runner),
65 job_thread_task_runner_(job_thread_task_runner),
66 callback_(std::move(callback)) {}
67 virtual ~OpenInputStreamWrapper() = default;
68
Start(int32_t request_id,const network::ResourceRequest & request)69 void Start(int32_t request_id, const network::ResourceRequest& request) {
70 work_thread_task_runner_->PostTask(
71 FROM_HERE,
72 base::BindOnce(&OpenInputStreamWrapper::OpenOnWorkThread,
73 base::WrapRefCounted(this), request_id, request));
74 }
75
GetCancelCallback()76 base::OnceClosure GetCancelCallback() {
77 return base::BindOnce(&OpenInputStreamWrapper::CancelOnJobThread,
78 base::WrapRefCounted(this));
79 }
80
CancelOnJobThread()81 void CancelOnJobThread() {
82 DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence());
83 if (callback_.is_null())
84 return;
85
86 callback_.Reset();
87
88 work_thread_task_runner_->PostTask(
89 FROM_HERE, base::BindOnce(&OpenInputStreamWrapper::CancelOnWorkThread,
90 base::WrapRefCounted(this)));
91 }
92
CancelOnWorkThread()93 void CancelOnWorkThread() {
94 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
95 if (is_canceled_)
96 return;
97 is_canceled_ = true;
98 OnCallback(nullptr);
99 }
100
OpenOnWorkThread(int32_t request_id,const network::ResourceRequest & request)101 void OpenOnWorkThread(int32_t request_id,
102 const network::ResourceRequest& request) {
103 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
104 if (is_canceled_)
105 return;
106
107 // |delegate_| will remain valid until OnCallback() is executed on
108 // |job_thread_task_runner_|.
109 if (!delegate_->OpenInputStream(
110 request_id, request,
111 base::BindOnce(&OpenInputStreamWrapper::OnCallback,
112 base::WrapRefCounted(this)))) {
113 is_canceled_ = true;
114 OnCallback(nullptr);
115 }
116 }
117
OnCallback(std::unique_ptr<InputStream> input_stream)118 void OnCallback(std::unique_ptr<InputStream> input_stream) {
119 if (!job_thread_task_runner_->RunsTasksInCurrentSequence()) {
120 job_thread_task_runner_->PostTask(
121 FROM_HERE,
122 base::BindOnce(&OpenInputStreamWrapper::OnCallback,
123 base::WrapRefCounted(this), std::move(input_stream)));
124 return;
125 }
126
127 // May be null if CancelOnJobThread() was called on
128 // |job_thread_task_runner_| while OpenOnWorkThread() was pending on
129 // |work_thread_task_runner_|.
130 if (callback_.is_null()) {
131 delegate_.reset();
132 return;
133 }
134
135 std::move(callback_).Run(std::move(delegate_), std::move(input_stream));
136 }
137
138 std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_;
139
140 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
141 scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
142
143 // Only accessed on |job_thread_task_runner_|.
144 OnInputStreamOpenedCallback callback_;
145
146 // Only accessed on |work_thread_task_runner_|.
147 bool is_canceled_ = false;
148 };
149
150 } // namespace
151
152 //==============================
153 // InputStreamReader
154 //=============================
155
156 // Class responsible for reading from the InputStream.
157 class InputStreamReader : public base::RefCountedThreadSafe<InputStreamReader> {
158 public:
159 // The constructor is called on the IO thread, not on the worker thread.
160 // Callbacks will be executed on the IO thread.
161 InputStreamReader(
162 std::unique_ptr<InputStream> stream,
163 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner);
164
165 InputStreamReader(const InputStreamReader&) = delete;
166 InputStreamReader& operator=(const InputStreamReader&) = delete;
167
168 // Skip |skip_bytes| number of bytes from |stream_|. |callback| will be
169 // executed asynchronously on the IO thread. A negative value passed to
170 // |callback| will indicate an error code, a positive value will indicate the
171 // number of bytes skipped.
172 void Skip(int64_t skip_bytes, InputStream::SkipCallback callback);
173
174 // Read up to |dest_size| bytes from |stream_| into |dest|. |callback| will be
175 // executed asynchronously on the IO thread. A negative value passed to
176 // |callback| will indicate an error code, a positive value will indicate the
177 // number of bytes read.
178 void Read(scoped_refptr<net::IOBuffer> dest,
179 int dest_size,
180 InputStream::ReadCallback callback);
181
182 private:
183 friend class base::RefCountedThreadSafe<InputStreamReader>;
184 virtual ~InputStreamReader();
185
186 void SkipOnWorkThread(int64_t skip_bytes, InputStream::SkipCallback callback);
187 void ReadOnWorkThread(scoped_refptr<net::IOBuffer> buffer,
188 int buffer_size,
189 InputStream::ReadCallback callback);
190
191 void SkipToRequestedRange();
192
193 static void ContinueSkipCallback(
194 scoped_refptr<InputStreamReader> stream,
195 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
196 int callback_id,
197 int64_t bytes_skipped);
198 static void ContinueReadCallback(
199 scoped_refptr<InputStreamReader> stream,
200 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
201 int callback_id,
202 int bytes_read);
203
204 void ContinueSkipCallbackOnWorkThread(int callback_id, int64_t bytes_skipped);
205 void ContinueReadCallbackOnWorkThread(int callback_id, int bytes_read);
206
207 void RunSkipCallback(int64_t bytes_skipped);
208 void RunReadCallback(int bytes_read);
209
210 static void RunSkipCallbackOnJobThread(
211 int64_t bytes_skipped,
212 InputStream::SkipCallback skip_callback);
213 static void RunReadCallbackOnJobThread(
214 int bytes_read,
215 InputStream::ReadCallback read_callback);
216
217 std::unique_ptr<InputStream> stream_;
218
219 // All InputStream methods are called this task runner.
220 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_;
221
222 // All callbacks are executed on this task runner.
223 scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_;
224
225 // The below members are only accessed on the work thread.
226 int64_t bytes_skipped_;
227 int64_t bytes_to_skip_;
228 InputStream::SkipCallback pending_skip_callback_;
229
230 scoped_refptr<net::IOBuffer> buffer_;
231 InputStream::ReadCallback pending_read_callback_;
232
233 int pending_callback_id_ = -1;
234
235 int next_callback_id_ = 0;
236 };
237
InputStreamReader(std::unique_ptr<InputStream> stream,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner)238 InputStreamReader::InputStreamReader(
239 std::unique_ptr<InputStream> stream,
240 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner)
241 : stream_(std::move(stream)),
242 work_thread_task_runner_(work_thread_task_runner),
243 job_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
244 CEF_REQUIRE_IOT();
245 DCHECK(stream_);
246 DCHECK(work_thread_task_runner_);
247 }
248
~InputStreamReader()249 InputStreamReader::~InputStreamReader() {}
250
Skip(int64_t skip_bytes,InputStream::SkipCallback callback)251 void InputStreamReader::Skip(int64_t skip_bytes,
252 InputStream::SkipCallback callback) {
253 work_thread_task_runner_->PostTask(
254 FROM_HERE, base::BindOnce(&InputStreamReader::SkipOnWorkThread,
255 base::WrapRefCounted(this), skip_bytes,
256 std::move(callback)));
257 }
258
Read(scoped_refptr<net::IOBuffer> dest,int dest_size,InputStream::ReadCallback callback)259 void InputStreamReader::Read(scoped_refptr<net::IOBuffer> dest,
260 int dest_size,
261 InputStream::ReadCallback callback) {
262 work_thread_task_runner_->PostTask(
263 FROM_HERE, base::BindOnce(&InputStreamReader::ReadOnWorkThread,
264 base::WrapRefCounted(this), dest, dest_size,
265 std::move(callback)));
266 }
267
SkipOnWorkThread(int64_t skip_bytes,InputStream::SkipCallback callback)268 void InputStreamReader::SkipOnWorkThread(int64_t skip_bytes,
269 InputStream::SkipCallback callback) {
270 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
271
272 // No callback should currently be pending.
273 DCHECK_EQ(pending_callback_id_, -1);
274 DCHECK(pending_skip_callback_.is_null());
275
276 pending_skip_callback_ = std::move(callback);
277
278 if (skip_bytes <= 0) {
279 RunSkipCallback(0);
280 return;
281 }
282
283 bytes_skipped_ = bytes_to_skip_ = skip_bytes;
284 SkipToRequestedRange();
285 }
286
ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest,int dest_size,InputStream::ReadCallback callback)287 void InputStreamReader::ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest,
288 int dest_size,
289 InputStream::ReadCallback callback) {
290 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
291
292 // No callback should currently be pending.
293 DCHECK_EQ(pending_callback_id_, -1);
294 DCHECK(pending_read_callback_.is_null());
295
296 pending_read_callback_ = std::move(callback);
297
298 if (!dest_size) {
299 RunReadCallback(0);
300 return;
301 }
302
303 DCHECK_GT(dest_size, 0);
304
305 buffer_ = dest;
306 pending_callback_id_ = ++next_callback_id_;
307
308 int bytes_read = 0;
309 bool result = stream_->Read(
310 buffer_.get(), dest_size, &bytes_read,
311 base::BindOnce(&InputStreamReader::ContinueReadCallback,
312 base::WrapRefCounted(this), work_thread_task_runner_,
313 pending_callback_id_));
314
315 // Check if the callback will execute asynchronously.
316 if (result && bytes_read == 0)
317 return;
318
319 RunReadCallback(result || bytes_read <= 0 ? bytes_read : net::ERR_FAILED);
320 }
321
SkipToRequestedRange()322 void InputStreamReader::SkipToRequestedRange() {
323 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
324
325 // Skip to the start of the requested data. This has to be done in a loop
326 // because the underlying InputStream is not guaranteed to skip the requested
327 // number of bytes.
328 do {
329 pending_callback_id_ = ++next_callback_id_;
330
331 int64_t skipped = 0;
332 bool result = stream_->Skip(
333 bytes_to_skip_, &skipped,
334 base::BindOnce(&InputStreamReader::ContinueSkipCallback,
335 base::WrapRefCounted(this), work_thread_task_runner_,
336 pending_callback_id_));
337
338 // Check if the callback will execute asynchronously.
339 if (result && skipped == 0)
340 return;
341
342 if (!result || skipped <= 0) {
343 RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
344 return;
345 }
346 DCHECK_LE(skipped, bytes_to_skip_);
347
348 bytes_to_skip_ -= skipped;
349 } while (bytes_to_skip_ > 0);
350
351 // All done, the requested number of bytes were skipped.
352 RunSkipCallback(bytes_skipped_);
353 }
354
355 // static
ContinueSkipCallback(scoped_refptr<InputStreamReader> stream,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,int callback_id,int64_t bytes_skipped)356 void InputStreamReader::ContinueSkipCallback(
357 scoped_refptr<InputStreamReader> stream,
358 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
359 int callback_id,
360 int64_t bytes_skipped) {
361 // Always execute asynchronously.
362 work_thread_task_runner->PostTask(
363 FROM_HERE,
364 base::BindOnce(&InputStreamReader::ContinueSkipCallbackOnWorkThread,
365 stream, callback_id, bytes_skipped));
366 }
367
368 // static
ContinueReadCallback(scoped_refptr<InputStreamReader> stream,scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,int callback_id,int bytes_read)369 void InputStreamReader::ContinueReadCallback(
370 scoped_refptr<InputStreamReader> stream,
371 scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner,
372 int callback_id,
373 int bytes_read) {
374 // Always execute asynchronously.
375 work_thread_task_runner->PostTask(
376 FROM_HERE,
377 base::BindOnce(&InputStreamReader::ContinueReadCallbackOnWorkThread,
378 stream, callback_id, bytes_read));
379 }
380
ContinueSkipCallbackOnWorkThread(int callback_id,int64_t bytes_skipped)381 void InputStreamReader::ContinueSkipCallbackOnWorkThread(
382 int callback_id,
383 int64_t bytes_skipped) {
384 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
385
386 // Check for out of order callbacks.
387 if (pending_callback_id_ != callback_id)
388 return;
389
390 DCHECK_LE(bytes_skipped, bytes_to_skip_);
391
392 if (bytes_to_skip_ > 0 && bytes_skipped > 0)
393 bytes_to_skip_ -= bytes_skipped;
394
395 if (bytes_skipped <= 0) {
396 RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
397 } else if (bytes_to_skip_ > 0) {
398 // Continue execution asynchronously.
399 work_thread_task_runner_->PostTask(
400 FROM_HERE,
401 base::BindOnce(&InputStreamReader::SkipToRequestedRange, this));
402 } else {
403 // All done, the requested number of bytes were skipped.
404 RunSkipCallback(bytes_skipped_);
405 }
406 }
407
ContinueReadCallbackOnWorkThread(int callback_id,int bytes_read)408 void InputStreamReader::ContinueReadCallbackOnWorkThread(int callback_id,
409 int bytes_read) {
410 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
411
412 // Check for out of order callbacks.
413 if (pending_callback_id_ != callback_id)
414 return;
415
416 RunReadCallback(bytes_read);
417 }
418
RunSkipCallback(int64_t bytes_skipped)419 void InputStreamReader::RunSkipCallback(int64_t bytes_skipped) {
420 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
421
422 DCHECK(!pending_skip_callback_.is_null());
423 job_thread_task_runner_->PostTask(
424 FROM_HERE,
425 base::BindOnce(InputStreamReader::RunSkipCallbackOnJobThread,
426 bytes_skipped, std::move(pending_skip_callback_)));
427
428 // Reset callback state.
429 pending_callback_id_ = -1;
430 bytes_skipped_ = bytes_to_skip_ = -1;
431 }
432
RunReadCallback(int bytes_read)433 void InputStreamReader::RunReadCallback(int bytes_read) {
434 DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence());
435
436 DCHECK(!pending_read_callback_.is_null());
437 job_thread_task_runner_->PostTask(
438 FROM_HERE, base::BindOnce(InputStreamReader::RunReadCallbackOnJobThread,
439 bytes_read, std::move(pending_read_callback_)));
440
441 // Reset callback state.
442 pending_callback_id_ = -1;
443 buffer_ = nullptr;
444 }
445
446 // static
RunSkipCallbackOnJobThread(int64_t bytes_skipped,InputStream::SkipCallback skip_callback)447 void InputStreamReader::RunSkipCallbackOnJobThread(
448 int64_t bytes_skipped,
449 InputStream::SkipCallback skip_callback) {
450 std::move(skip_callback).Run(bytes_skipped);
451 }
452
453 // static
RunReadCallbackOnJobThread(int bytes_read,InputStream::ReadCallback read_callback)454 void InputStreamReader::RunReadCallbackOnJobThread(
455 int bytes_read,
456 InputStream::ReadCallback read_callback) {
457 std::move(read_callback).Run(bytes_read);
458 }
459
460 //==============================
461 // StreamReaderURLLoader
462 //=============================
463
StreamReaderURLLoader(int32_t request_id,const network::ResourceRequest & request,mojo::PendingRemote<network::mojom::URLLoaderClient> client,mojo::PendingRemote<network::mojom::TrustedHeaderClient> header_client,const net::MutableNetworkTrafficAnnotationTag & traffic_annotation,std::unique_ptr<Delegate> response_delegate)464 StreamReaderURLLoader::StreamReaderURLLoader(
465 int32_t request_id,
466 const network::ResourceRequest& request,
467 mojo::PendingRemote<network::mojom::URLLoaderClient> client,
468 mojo::PendingRemote<network::mojom::TrustedHeaderClient> header_client,
469 const net::MutableNetworkTrafficAnnotationTag& traffic_annotation,
470 std::unique_ptr<Delegate> response_delegate)
471 : request_id_(request_id),
472 request_(request),
473 client_(std::move(client)),
474 header_client_(std::move(header_client)),
475 traffic_annotation_(traffic_annotation),
476 response_delegate_(std::move(response_delegate)),
477 writable_handle_watcher_(FROM_HERE,
478 mojo::SimpleWatcher::ArmingPolicy::MANUAL,
479 base::SequencedTaskRunnerHandle::Get()),
480 weak_factory_(this) {
481 DCHECK(response_delegate_);
482 // If there is a client error, clean up the request.
483 client_.set_disconnect_handler(
484 base::BindOnce(&StreamReaderURLLoader::RequestComplete,
485 weak_factory_.GetWeakPtr(), net::ERR_ABORTED));
486
487 // All InputStream work will be performed on this task runner.
488 stream_work_task_runner_ =
489 base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock()});
490 }
491
~StreamReaderURLLoader()492 StreamReaderURLLoader::~StreamReaderURLLoader() {
493 if (open_cancel_callback_) {
494 // Release the Delegate held by OpenInputStreamWrapper.
495 std::move(open_cancel_callback_).Run();
496 }
497 }
498
Start()499 void StreamReaderURLLoader::Start() {
500 DCHECK(thread_checker_.CalledOnValidThread());
501
502 if (!ParseRange(request_.headers)) {
503 RequestComplete(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE);
504 return;
505 }
506
507 if (header_client_.is_bound()) {
508 header_client_->OnBeforeSendHeaders(
509 request_.headers,
510 base::BindOnce(&StreamReaderURLLoader::ContinueWithRequestHeaders,
511 weak_factory_.GetWeakPtr()));
512 } else {
513 ContinueWithRequestHeaders(net::OK, absl::nullopt);
514 }
515 }
516
ContinueWithRequestHeaders(int32_t result,const absl::optional<net::HttpRequestHeaders> & headers)517 void StreamReaderURLLoader::ContinueWithRequestHeaders(
518 int32_t result,
519 const absl::optional<net::HttpRequestHeaders>& headers) {
520 if (result != net::OK) {
521 RequestComplete(result);
522 return;
523 }
524
525 if (headers) {
526 DCHECK(header_client_.is_bound());
527 request_.headers = *headers;
528 }
529
530 open_cancel_callback_ = OpenInputStreamWrapper::Open(
531 // This is intentional - the loader could be deleted while
532 // the callback is executing on the background thread. The
533 // delegate will be "returned" to the loader once the
534 // InputStream open attempt is completed.
535 std::move(response_delegate_), stream_work_task_runner_, request_id_,
536 request_,
537 base::BindOnce(&StreamReaderURLLoader::OnInputStreamOpened,
538 weak_factory_.GetWeakPtr()));
539 }
540
FollowRedirect(const std::vector<std::string> & removed_headers,const net::HttpRequestHeaders & modified_headers,const net::HttpRequestHeaders & modified_cors_exempt_headers,const absl::optional<GURL> & new_url)541 void StreamReaderURLLoader::FollowRedirect(
542 const std::vector<std::string>& removed_headers,
543 const net::HttpRequestHeaders& modified_headers,
544 const net::HttpRequestHeaders& modified_cors_exempt_headers,
545 const absl::optional<GURL>& new_url) {
546 NOTREACHED();
547 }
548
SetPriority(net::RequestPriority priority,int intra_priority_value)549 void StreamReaderURLLoader::SetPriority(net::RequestPriority priority,
550 int intra_priority_value) {}
551
PauseReadingBodyFromNet()552 void StreamReaderURLLoader::PauseReadingBodyFromNet() {}
553
ResumeReadingBodyFromNet()554 void StreamReaderURLLoader::ResumeReadingBodyFromNet() {}
555
OnInputStreamOpened(std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate,std::unique_ptr<InputStream> input_stream)556 void StreamReaderURLLoader::OnInputStreamOpened(
557 std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate,
558 std::unique_ptr<InputStream> input_stream) {
559 DCHECK(thread_checker_.CalledOnValidThread());
560 DCHECK(returned_delegate);
561 response_delegate_ = std::move(returned_delegate);
562 open_cancel_callback_.Reset();
563
564 if (!input_stream) {
565 bool restarted = false;
566 response_delegate_->OnInputStreamOpenFailed(request_id_, &restarted);
567 if (restarted) {
568 // The request has been restarted with a new loader.
569 // |this| will be deleted.
570 CleanUp();
571 } else {
572 HeadersComplete(net::HTTP_NOT_FOUND, -1);
573 }
574 return;
575 }
576
577 input_stream_reader_ = base::MakeRefCounted<InputStreamReader>(
578 std::move(input_stream), stream_work_task_runner_);
579
580 if (!byte_range_valid()) {
581 OnReaderSkipCompleted(0);
582 } else {
583 input_stream_reader_->Skip(
584 byte_range_.first_byte_position(),
585 base::BindOnce(&StreamReaderURLLoader::OnReaderSkipCompleted,
586 weak_factory_.GetWeakPtr()));
587 }
588 }
589
OnReaderSkipCompleted(int64_t bytes_skipped)590 void StreamReaderURLLoader::OnReaderSkipCompleted(int64_t bytes_skipped) {
591 DCHECK(thread_checker_.CalledOnValidThread());
592
593 if (!byte_range_valid()) {
594 // Expected content length is unspecified.
595 HeadersComplete(net::HTTP_OK, -1);
596 } else if (bytes_skipped == byte_range_.first_byte_position()) {
597 // We skipped the expected number of bytes.
598 int64_t expected_content_length = -1;
599 if (byte_range_.HasLastBytePosition()) {
600 expected_content_length = byte_range_.last_byte_position() -
601 byte_range_.first_byte_position() + 1;
602 DCHECK_GE(expected_content_length, 0);
603 }
604 HeadersComplete(net::HTTP_OK, expected_content_length);
605 } else {
606 RequestComplete(bytes_skipped < 0 ? bytes_skipped : net::ERR_FAILED);
607 }
608 }
609
HeadersComplete(int orig_status_code,int64_t expected_content_length)610 void StreamReaderURLLoader::HeadersComplete(int orig_status_code,
611 int64_t expected_content_length) {
612 DCHECK(thread_checker_.CalledOnValidThread());
613
614 int status_code = orig_status_code;
615 std::string status_text =
616 net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code));
617 std::string mime_type, charset;
618 int64_t content_length = expected_content_length;
619 ResourceResponse::HeaderMap extra_headers;
620 response_delegate_->GetResponseHeaders(request_id_, &status_code,
621 &status_text, &mime_type, &charset,
622 &content_length, &extra_headers);
623
624 if (status_code < 0) {
625 // Early exit if the handler reported an error.
626 RequestComplete(status_code);
627 return;
628 }
629
630 auto pending_response = network::mojom::URLResponseHead::New();
631 pending_response->request_start = base::TimeTicks::Now();
632 pending_response->response_start = base::TimeTicks::Now();
633
634 auto headers = MakeResponseHeaders(
635 status_code, status_text, mime_type, charset, content_length,
636 extra_headers, false /* allow_existing_header_override */);
637 pending_response->headers = headers;
638
639 if (content_length >= 0)
640 pending_response->content_length = content_length;
641
642 if (!mime_type.empty()) {
643 pending_response->mime_type = mime_type;
644 if (!charset.empty())
645 pending_response->charset = charset;
646 }
647
648 if (header_client_.is_bound()) {
649 header_client_->OnHeadersReceived(
650 headers->raw_headers(), net::IPEndPoint(),
651 base::BindOnce(&StreamReaderURLLoader::ContinueWithResponseHeaders,
652 weak_factory_.GetWeakPtr(),
653 std::move(pending_response)));
654 } else {
655 ContinueWithResponseHeaders(std::move(pending_response), net::OK,
656 absl::nullopt, absl::nullopt);
657 }
658 }
659
ContinueWithResponseHeaders(network::mojom::URLResponseHeadPtr pending_response,int32_t result,const absl::optional<std::string> & headers,const absl::optional<GURL> & redirect_url)660 void StreamReaderURLLoader::ContinueWithResponseHeaders(
661 network::mojom::URLResponseHeadPtr pending_response,
662 int32_t result,
663 const absl::optional<std::string>& headers,
664 const absl::optional<GURL>& redirect_url) {
665 if (result != net::OK) {
666 RequestComplete(result);
667 return;
668 }
669
670 if (headers) {
671 DCHECK(header_client_.is_bound());
672 pending_response->headers =
673 base::MakeRefCounted<net::HttpResponseHeaders>(*headers);
674 }
675
676 auto pending_headers = pending_response->headers;
677
678 // What the length would be if we sent headers over the network. Used to
679 // calculate data length.
680 header_length_ = pending_headers->raw_headers().length();
681
682 DCHECK(client_.is_bound());
683
684 std::string location;
685 const auto has_redirect_url = redirect_url && !redirect_url->is_empty();
686 if (has_redirect_url || pending_headers->IsRedirect(&location)) {
687 pending_response->encoded_data_length = header_length_;
688 pending_response->content_length = pending_response->encoded_body_length =
689 0;
690 const GURL new_location =
691 has_redirect_url ? *redirect_url : request_.url.Resolve(location);
692 client_->OnReceiveRedirect(
693 MakeRedirectInfo(request_, pending_headers.get(), new_location,
694 pending_headers->response_code()),
695 std::move(pending_response));
696 // The client will restart the request with a new loader.
697 // |this| will be deleted.
698 CleanUp();
699 } else {
700 client_->OnReceiveResponse(std::move(pending_response),
701 mojo::ScopedDataPipeConsumerHandle());
702 }
703 }
704
ContinueResponse(bool was_redirected)705 void StreamReaderURLLoader::ContinueResponse(bool was_redirected) {
706 DCHECK(thread_checker_.CalledOnValidThread());
707
708 if (was_redirected) {
709 // Special case where we allow the client to perform the redirect.
710 // The client will restart the request with a new loader.
711 // |this| will be deleted.
712 CleanUp();
713 } else {
714 SendBody();
715 }
716 }
717
SendBody()718 void StreamReaderURLLoader::SendBody() {
719 DCHECK(thread_checker_.CalledOnValidThread());
720
721 mojo::ScopedDataPipeConsumerHandle consumer_handle;
722 if (CreateDataPipe(nullptr /*options*/, producer_handle_, consumer_handle) !=
723 MOJO_RESULT_OK) {
724 RequestComplete(net::ERR_FAILED);
725 return;
726 }
727 writable_handle_watcher_.Watch(
728 producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
729 base::BindRepeating(&StreamReaderURLLoader::OnDataPipeWritable,
730 base::Unretained(this)));
731 client_->OnStartLoadingResponseBody(std::move(consumer_handle));
732
733 ReadMore();
734 }
735
ReadMore()736 void StreamReaderURLLoader::ReadMore() {
737 DCHECK(thread_checker_.CalledOnValidThread());
738 DCHECK(!pending_buffer_.get());
739
740 uint32_t num_bytes;
741 MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite(
742 &producer_handle_, &pending_buffer_, &num_bytes);
743 if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
744 // The pipe is full. We need to wait for it to have more space.
745 writable_handle_watcher_.ArmOrNotify();
746 return;
747 } else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) {
748 // The data pipe consumer handle has been closed.
749 RequestComplete(net::ERR_ABORTED);
750 return;
751 } else if (mojo_result != MOJO_RESULT_OK) {
752 // The body stream is in a bad state. Bail out.
753 RequestComplete(net::ERR_UNEXPECTED);
754 return;
755 }
756 scoped_refptr<net::IOBuffer> buffer(
757 new network::NetToMojoIOBuffer(pending_buffer_.get()));
758
759 if (!input_stream_reader_.get()) {
760 // This will happen if opening the InputStream fails in which case the
761 // error is communicated by setting the HTTP response status header rather
762 // than failing the request during the header fetch phase.
763 OnReaderReadCompleted(0);
764 return;
765 }
766
767 input_stream_reader_->Read(
768 buffer, base::checked_cast<int>(num_bytes),
769 base::BindOnce(&StreamReaderURLLoader::OnReaderReadCompleted,
770 weak_factory_.GetWeakPtr()));
771 }
772
OnDataPipeWritable(MojoResult result)773 void StreamReaderURLLoader::OnDataPipeWritable(MojoResult result) {
774 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
775 RequestComplete(net::ERR_ABORTED);
776 return;
777 }
778 DCHECK_EQ(result, MOJO_RESULT_OK) << result;
779
780 ReadMore();
781 }
782
OnReaderReadCompleted(int bytes_read)783 void StreamReaderURLLoader::OnReaderReadCompleted(int bytes_read) {
784 DCHECK(thread_checker_.CalledOnValidThread());
785
786 DCHECK(pending_buffer_);
787 if (bytes_read < 0) {
788 // Error case.
789 RequestComplete(bytes_read);
790 return;
791 }
792 if (bytes_read == 0) {
793 // Eof, read completed.
794 pending_buffer_->Complete(0);
795 RequestComplete(net::OK);
796 return;
797 }
798 producer_handle_ = pending_buffer_->Complete(bytes_read);
799 pending_buffer_ = nullptr;
800
801 client_->OnTransferSizeUpdated(bytes_read);
802 total_bytes_read_ += bytes_read;
803
804 base::ThreadTaskRunnerHandle::Get()->PostTask(
805 FROM_HERE, base::BindOnce(&StreamReaderURLLoader::ReadMore,
806 weak_factory_.GetWeakPtr()));
807 }
808
RequestComplete(int status_code)809 void StreamReaderURLLoader::RequestComplete(int status_code) {
810 DCHECK(thread_checker_.CalledOnValidThread());
811
812 auto status = network::URLLoaderCompletionStatus(status_code);
813 status.completion_time = base::TimeTicks::Now();
814 status.encoded_data_length = total_bytes_read_ + header_length_;
815 status.encoded_body_length = total_bytes_read_;
816 // We don't support decoders, so use the same value.
817 status.decoded_body_length = total_bytes_read_;
818
819 client_->OnComplete(status);
820 CleanUp();
821 }
822
CleanUp()823 void StreamReaderURLLoader::CleanUp() {
824 DCHECK(thread_checker_.CalledOnValidThread());
825
826 // Resets the watchers and pipes, so that we will never be called back.
827 writable_handle_watcher_.Cancel();
828 pending_buffer_ = nullptr;
829 producer_handle_.reset();
830
831 // Manages its own lifetime.
832 delete this;
833 }
834
ParseRange(const net::HttpRequestHeaders & headers)835 bool StreamReaderURLLoader::ParseRange(const net::HttpRequestHeaders& headers) {
836 DCHECK(thread_checker_.CalledOnValidThread());
837
838 std::string range_header;
839 if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) {
840 // This loader only cares about the Range header so that we know how many
841 // bytes in the stream to skip and how many to read after that.
842 std::vector<net::HttpByteRange> ranges;
843 if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) {
844 // In case of multi-range request only use the first range.
845 // We don't support multirange requests.
846 if (ranges.size() == 1)
847 byte_range_ = ranges[0];
848 } else {
849 // This happens if the range header could not be parsed or is invalid.
850 return false;
851 }
852 }
853 return true;
854 }
855
byte_range_valid() const856 bool StreamReaderURLLoader::byte_range_valid() const {
857 return byte_range_.IsValid() && byte_range_.first_byte_position() >= 0;
858 }
859
860 } // namespace net_service