• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/host/native_messaging/native_messaging_reader.h"
6 
7 #include <string>
8 
9 #include "base/bind.h"
10 #include "base/json/json_reader.h"
11 #include "base/location.h"
12 #include "base/sequenced_task_runner.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/stl_util.h"
15 #include "base/thread_task_runner_handle.h"
16 #include "base/threading/sequenced_worker_pool.h"
17 #include "base/values.h"
18 #include "net/base/file_stream.h"
19 
20 namespace {
21 
22 // uint32 is specified in the protocol as the type for the message header.
23 typedef uint32 MessageLengthType;
24 
25 const int kMessageHeaderSize = sizeof(MessageLengthType);
26 
27 // Limit the size of received messages, to avoid excessive memory-allocation in
28 // this process, and potential overflow issues when casting to a signed 32-bit
29 // int.
30 const MessageLengthType kMaximumMessageSize = 1024 * 1024;
31 
32 }  // namespace
33 
34 namespace remoting {
35 
36 class NativeMessagingReader::Core {
37  public:
38   Core(base::PlatformFile handle,
39        scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
40        scoped_refptr<base::SequencedTaskRunner> read_task_runner,
41        base::WeakPtr<NativeMessagingReader> reader_);
42   ~Core();
43 
44   // Reads a message from the Native Messaging client and passes it to
45   // |message_callback_| on the originating thread. Called on the reader thread.
46   void ReadMessage();
47 
48  private:
49   // Notify the reader's EOF callback when an error occurs or EOF is reached.
50   void NotifyEof();
51 
52   net::FileStream read_stream_;
53 
54   base::WeakPtr<NativeMessagingReader> reader_;
55 
56   // Used to post the caller-supplied reader callbacks on the caller thread.
57   scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
58 
59   // Used to DCHECK that the reader code executes on the correct thread.
60   scoped_refptr<base::SequencedTaskRunner> read_task_runner_;
61 
62   DISALLOW_COPY_AND_ASSIGN(Core);
63 };
64 
Core(base::PlatformFile handle,scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,scoped_refptr<base::SequencedTaskRunner> read_task_runner,base::WeakPtr<NativeMessagingReader> reader)65 NativeMessagingReader::Core::Core(
66     base::PlatformFile handle,
67     scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
68     scoped_refptr<base::SequencedTaskRunner> read_task_runner,
69     base::WeakPtr<NativeMessagingReader> reader)
70     : read_stream_(handle, base::PLATFORM_FILE_READ, NULL),
71       reader_(reader),
72       caller_task_runner_(caller_task_runner),
73       read_task_runner_(read_task_runner) {
74 }
75 
~Core()76 NativeMessagingReader::Core::~Core() {}
77 
ReadMessage()78 void NativeMessagingReader::Core::ReadMessage() {
79   DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
80 
81   // Keep reading messages until the stream is closed or an error occurs.
82   while (true) {
83     MessageLengthType message_length;
84     int read_result = read_stream_.ReadUntilComplete(
85         reinterpret_cast<char*>(&message_length), kMessageHeaderSize);
86     if (read_result != kMessageHeaderSize) {
87       // 0 means EOF which is normal and should not be logged as an error.
88       if (read_result != 0) {
89         LOG(ERROR) << "Failed to read message header, read returned "
90                    << read_result;
91       }
92       NotifyEof();
93       return;
94     }
95 
96     if (message_length > kMaximumMessageSize) {
97       LOG(ERROR) << "Message size too large: " << message_length;
98       NotifyEof();
99       return;
100     }
101 
102     std::string message_json(message_length, '\0');
103     read_result = read_stream_.ReadUntilComplete(string_as_array(&message_json),
104                                                  message_length);
105     if (read_result != static_cast<int>(message_length)) {
106       LOG(ERROR) << "Failed to read message body, read returned "
107                  << read_result;
108       NotifyEof();
109       return;
110     }
111 
112     scoped_ptr<base::Value> message(base::JSONReader::Read(message_json));
113     if (!message) {
114       LOG(ERROR) << "Failed to parse JSON message: " << message;
115       NotifyEof();
116       return;
117     }
118 
119     // Notify callback of new message.
120     caller_task_runner_->PostTask(
121         FROM_HERE, base::Bind(&NativeMessagingReader::InvokeMessageCallback,
122                               reader_, base::Passed(&message)));
123   }
124 }
125 
NotifyEof()126 void NativeMessagingReader::Core::NotifyEof() {
127   DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
128   caller_task_runner_->PostTask(
129       FROM_HERE,
130       base::Bind(&NativeMessagingReader::InvokeEofCallback, reader_));
131 }
132 
NativeMessagingReader(base::PlatformFile handle)133 NativeMessagingReader::NativeMessagingReader(base::PlatformFile handle)
134     : reader_thread_("Reader"),
135       weak_factory_(this) {
136   reader_thread_.Start();
137   read_task_runner_ = reader_thread_.message_loop_proxy();
138   core_.reset(new Core(handle, base::ThreadTaskRunnerHandle::Get(),
139                        read_task_runner_, weak_factory_.GetWeakPtr()));
140 }
141 
~NativeMessagingReader()142 NativeMessagingReader::~NativeMessagingReader() {
143   read_task_runner_->DeleteSoon(FROM_HERE, core_.release());
144 }
145 
Start(MessageCallback message_callback,base::Closure eof_callback)146 void NativeMessagingReader::Start(MessageCallback message_callback,
147                                   base::Closure eof_callback) {
148   message_callback_ = message_callback;
149   eof_callback_ = eof_callback;
150 
151   // base::Unretained is safe since |core_| is only deleted via the
152   // DeleteSoon task which is posted from this class's dtor.
153   read_task_runner_->PostTask(
154       FROM_HERE, base::Bind(&NativeMessagingReader::Core::ReadMessage,
155                             base::Unretained(core_.get())));
156 }
157 
InvokeMessageCallback(scoped_ptr<base::Value> message)158 void NativeMessagingReader::InvokeMessageCallback(
159     scoped_ptr<base::Value> message) {
160   message_callback_.Run(message.Pass());
161 }
162 
InvokeEofCallback()163 void NativeMessagingReader::InvokeEofCallback() {
164   eof_callback_.Run();
165 }
166 
167 }  // namespace remoting
168