• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/base/file_stream_context.h"
6 
7 #include <windows.h>
8 
9 #include <utility>
10 
11 #include "base/files/file_path.h"
12 #include "base/functional/bind.h"
13 #include "base/location.h"
14 #include "base/logging.h"
15 #include "base/message_loop/message_pump_for_io.h"
16 #include "base/task/current_thread.h"
17 #include "base/task/single_thread_task_runner.h"
18 #include "base/task/task_runner.h"
19 #include "net/base/io_buffer.h"
20 #include "net/base/net_errors.h"
21 
22 namespace net {
23 
24 namespace {
25 
SetOffset(OVERLAPPED * overlapped,const LARGE_INTEGER & offset)26 void SetOffset(OVERLAPPED* overlapped, const LARGE_INTEGER& offset) {
27   overlapped->Offset = offset.LowPart;
28   overlapped->OffsetHigh = offset.HighPart;
29 }
30 
IncrementOffset(OVERLAPPED * overlapped,DWORD count)31 void IncrementOffset(OVERLAPPED* overlapped, DWORD count) {
32   LARGE_INTEGER offset;
33   offset.LowPart = overlapped->Offset;
34   offset.HighPart = overlapped->OffsetHigh;
35   offset.QuadPart += static_cast<LONGLONG>(count);
36   SetOffset(overlapped, offset);
37 }
38 
39 }  // namespace
40 
Context(scoped_refptr<base::TaskRunner> task_runner)41 FileStream::Context::Context(scoped_refptr<base::TaskRunner> task_runner)
42     : Context(base::File(), std::move(task_runner)) {}
43 
Context(base::File file,scoped_refptr<base::TaskRunner> task_runner)44 FileStream::Context::Context(base::File file,
45                              scoped_refptr<base::TaskRunner> task_runner)
46     : base::MessagePumpForIO::IOHandler(FROM_HERE),
47       file_(std::move(file)),
48       task_runner_(std::move(task_runner)) {
49   if (file_.IsValid()) {
50     DCHECK(file_.async());
51     OnFileOpened();
52   }
53 }
54 
55 FileStream::Context::~Context() = default;
56 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)57 int FileStream::Context::Read(IOBuffer* buf,
58                               int buf_len,
59                               CompletionOnceCallback callback) {
60   DCHECK(!async_in_progress_);
61 
62   DCHECK(!async_read_initiated_);
63   DCHECK(!async_read_completed_);
64   DCHECK(!io_complete_for_read_received_);
65 
66   IOCompletionIsPending(std::move(callback), buf);
67 
68   async_read_initiated_ = true;
69   result_ = 0;
70 
71   task_runner_->PostTask(
72       FROM_HERE,
73       base::BindOnce(&FileStream::Context::ReadAsync, base::Unretained(this),
74                      file_.GetPlatformFile(), base::WrapRefCounted(buf),
75                      buf_len, &io_context_.overlapped,
76                      base::SingleThreadTaskRunner::GetCurrentDefault()));
77   return ERR_IO_PENDING;
78 }
79 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)80 int FileStream::Context::Write(IOBuffer* buf,
81                                int buf_len,
82                                CompletionOnceCallback callback) {
83   DCHECK(!async_in_progress_);
84 
85   result_ = 0;
86 
87   DWORD bytes_written = 0;
88   if (!WriteFile(file_.GetPlatformFile(), buf->data(), buf_len,
89                  &bytes_written, &io_context_.overlapped)) {
90     IOResult error = IOResult::FromOSError(GetLastError());
91     if (error.os_error == ERROR_IO_PENDING) {
92       IOCompletionIsPending(std::move(callback), buf);
93     } else {
94       LOG(WARNING) << "WriteFile failed: " << error.os_error;
95     }
96     return static_cast<int>(error.result);
97   }
98 
99   IOCompletionIsPending(std::move(callback), buf);
100   return ERR_IO_PENDING;
101 }
102 
ConnectNamedPipe(CompletionOnceCallback callback)103 int FileStream::Context::ConnectNamedPipe(CompletionOnceCallback callback) {
104   DCHECK(!async_in_progress_);
105 
106   result_ = 0;
107   // Always returns zero when making an asynchronous call.
108   ::ConnectNamedPipe(file_.GetPlatformFile(), &io_context_.overlapped);
109   const auto error = ::GetLastError();
110   if (error == ERROR_PIPE_CONNECTED) {
111     return OK;  // The client has already connected; operation complete.
112   }
113   if (error == ERROR_IO_PENDING) {
114     IOCompletionIsPending(std::move(callback), /*buf=*/nullptr);
115     return ERR_IO_PENDING;  // Wait for an I/O completion packet.
116   }
117   // ERROR_INVALID_FUNCTION means that `file_` isn't a handle to a named pipe,
118   // but to an actual file. This is a programming error.
119   CHECK_NE(error, static_cast<DWORD>(ERROR_INVALID_FUNCTION));
120   return static_cast<int>(MapSystemError(error));
121 }
122 
SeekFileImpl(int64_t offset)123 FileStream::Context::IOResult FileStream::Context::SeekFileImpl(
124     int64_t offset) {
125   LARGE_INTEGER result;
126   result.QuadPart = offset;
127   SetOffset(&io_context_.overlapped, result);
128   return IOResult(result.QuadPart, 0);
129 }
130 
OnFileOpened()131 void FileStream::Context::OnFileOpened() {
132   if (!base::CurrentIOThread::Get()->RegisterIOHandler(file_.GetPlatformFile(),
133                                                        this)) {
134     file_.Close();
135   }
136 }
137 
IOCompletionIsPending(CompletionOnceCallback callback,IOBuffer * buf)138 void FileStream::Context::IOCompletionIsPending(CompletionOnceCallback callback,
139                                                 IOBuffer* buf) {
140   DCHECK(callback_.is_null());
141   callback_ = std::move(callback);
142   in_flight_buf_ = buf;  // Hold until the async operation ends.
143   async_in_progress_ = true;
144 }
145 
OnIOCompleted(base::MessagePumpForIO::IOContext * context,DWORD bytes_read,DWORD error)146 void FileStream::Context::OnIOCompleted(
147     base::MessagePumpForIO::IOContext* context,
148     DWORD bytes_read,
149     DWORD error) {
150   DCHECK_EQ(&io_context_, context);
151   DCHECK(!callback_.is_null());
152   DCHECK(async_in_progress_);
153 
154   if (!async_read_initiated_)
155     async_in_progress_ = false;
156 
157   if (orphaned_) {
158     io_complete_for_read_received_ = true;
159     // If we are called due to a pending read and the asynchronous read task
160     // has not completed we have to keep the context around until it completes.
161     if (async_read_initiated_ && !async_read_completed_)
162       return;
163     DeleteOrphanedContext();
164     return;
165   }
166 
167   if (error == ERROR_HANDLE_EOF) {
168     result_ = 0;
169   } else if (error) {
170     IOResult error_result = IOResult::FromOSError(error);
171     result_ = static_cast<int>(error_result.result);
172   } else {
173     if (result_)
174       DCHECK_EQ(result_, static_cast<int>(bytes_read));
175     result_ = bytes_read;
176     IncrementOffset(&io_context_.overlapped, bytes_read);
177   }
178 
179   if (async_read_initiated_)
180     io_complete_for_read_received_ = true;
181 
182   InvokeUserCallback();
183 }
184 
InvokeUserCallback()185 void FileStream::Context::InvokeUserCallback() {
186   // For an asynchonous Read operation don't invoke the user callback until
187   // we receive the IO completion notification and the asynchronous Read
188   // completion notification.
189   if (async_read_initiated_) {
190     if (!io_complete_for_read_received_ || !async_read_completed_)
191       return;
192     async_read_initiated_ = false;
193     io_complete_for_read_received_ = false;
194     async_read_completed_ = false;
195     async_in_progress_ = false;
196   }
197   scoped_refptr<IOBuffer> temp_buf = in_flight_buf_;
198   in_flight_buf_ = nullptr;
199   std::move(callback_).Run(result_);
200 }
201 
DeleteOrphanedContext()202 void FileStream::Context::DeleteOrphanedContext() {
203   async_in_progress_ = false;
204   callback_.Reset();
205   in_flight_buf_ = nullptr;
206   CloseAndDelete();
207 }
208 
209 // static
ReadAsync(FileStream::Context * context,HANDLE file,scoped_refptr<IOBuffer> buf,int buf_len,OVERLAPPED * overlapped,scoped_refptr<base::SingleThreadTaskRunner> origin_thread_task_runner)210 void FileStream::Context::ReadAsync(
211     FileStream::Context* context,
212     HANDLE file,
213     scoped_refptr<IOBuffer> buf,
214     int buf_len,
215     OVERLAPPED* overlapped,
216     scoped_refptr<base::SingleThreadTaskRunner> origin_thread_task_runner) {
217   DWORD bytes_read = 0;
218   BOOL ret = ::ReadFile(file, buf->data(), buf_len, &bytes_read, overlapped);
219   origin_thread_task_runner->PostTask(
220       FROM_HERE, base::BindOnce(&FileStream::Context::ReadAsyncResult,
221                                 base::Unretained(context), ret, bytes_read,
222                                 ::GetLastError()));
223 }
224 
ReadAsyncResult(BOOL read_file_ret,DWORD bytes_read,DWORD os_error)225 void FileStream::Context::ReadAsyncResult(BOOL read_file_ret,
226                                           DWORD bytes_read,
227                                           DWORD os_error) {
228   // If the context is orphaned and we already received the io completion
229   // notification then we should delete the context and get out.
230   if (orphaned_ && io_complete_for_read_received_) {
231     DeleteOrphanedContext();
232     return;
233   }
234 
235   async_read_completed_ = true;
236   if (read_file_ret) {
237     result_ = bytes_read;
238     InvokeUserCallback();
239     return;
240   }
241 
242   IOResult error = IOResult::FromOSError(os_error);
243   if (error.os_error == ERROR_IO_PENDING) {
244     InvokeUserCallback();
245   } else {
246     OnIOCompleted(&io_context_, 0, error.os_error);
247   }
248 }
249 
250 }  // namespace net
251