1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <chrono>
18 #include <ctime>
19 #include <fstream>
20 #include <iostream>
21 #include <memory>
22 #include <string>
23
24 #include <grpcpp.h>
25
26 #include "gnss_grpc_proxy.grpc.pb.h"
27
28 #include <signal.h>
29
30 #include <chrono>
31 #include <deque>
32 #include <mutex>
33 #include <sstream>
34 #include <thread>
35 #include <vector>
36
37 #include <android-base/logging.h>
38 #include <android-base/strings.h>
39 #include <gflags/gflags.h>
40
41 #include <common/libs/fs/shared_fd.h>
42 #include <common/libs/fs/shared_buf.h>
43 #include <common/libs/fs/shared_select.h>
44 #include <host/libs/config/cuttlefish_config.h>
45
46 using grpc::Server;
47 using grpc::ServerBuilder;
48 using grpc::ServerContext;
49 using grpc::Status;
50 using gnss_grpc_proxy::SendNmeaRequest;
51 using gnss_grpc_proxy::SendNmeaReply;
52 using gnss_grpc_proxy::GnssGrpcProxy;
53
54 DEFINE_int32(gnss_in_fd,
55 -1,
56 "File descriptor for the gnss's input channel");
57 DEFINE_int32(gnss_out_fd,
58 -1,
59 "File descriptor for the gnss's output channel");
60
61 DEFINE_int32(gnss_grpc_port,
62 -1,
63 "Service port for gnss grpc");
64
65 DEFINE_string(gnss_file_path,
66 "",
67 "NMEA file path for gnss grpc");
68
69 constexpr char CMD_GET_LOCATION[] = "CMD_GET_LOCATION";
70 constexpr char CMD_GET_RAWMEASUREMENT[] = "CMD_GET_RAWMEASUREMENT";
71 constexpr char END_OF_MSG_MARK[] = "\n\n\n\n";
72
73 constexpr uint32_t GNSS_SERIAL_BUFFER_SIZE = 4096;
74 // Logic and data behind the server's behavior.
75 class GnssGrpcProxyServiceImpl final : public GnssGrpcProxy::Service {
76 public:
GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,cuttlefish::SharedFD gnss_out)77 GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,
78 cuttlefish::SharedFD gnss_out) : gnss_in_(gnss_in),
79 gnss_out_(gnss_out) {}
SendNmea(ServerContext * context,const SendNmeaRequest * request,SendNmeaReply * reply)80 Status SendNmea(ServerContext* context, const SendNmeaRequest* request,
81 SendNmeaReply* reply) override {
82 reply->set_reply("Received nmea record.");
83 auto buffer = request->nmea();
84 std::lock_guard<std::mutex> lock(cached_nmea_mutex);
85 cached_nmea = request->nmea();
86 return Status::OK;
87 }
88
sendToSerial()89 void sendToSerial() {
90 std::lock_guard<std::mutex> lock(cached_nmea_mutex);
91 if (!isNMEA(cached_nmea)) {
92 return;
93 }
94 ssize_t bytes_written =
95 cuttlefish::WriteAll(gnss_in_, cached_nmea + END_OF_MSG_MARK);
96 if (bytes_written < 0) {
97 LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
98 }
99 }
100
sendGnssRawToSerial()101 void sendGnssRawToSerial() {
102 std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
103 if (!isGnssRawMeasurement(cached_gnss_raw)) {
104 return;
105 }
106 if (previous_cached_gnss_raw == cached_gnss_raw) {
107 // Skip for same record
108 return;
109 } else {
110 // Update cached data
111 LOG(DEBUG) << "Skip same record";
112 previous_cached_gnss_raw = cached_gnss_raw;
113 }
114 ssize_t bytes_written =
115 cuttlefish::WriteAll(gnss_in_, cached_gnss_raw + END_OF_MSG_MARK);
116 LOG(DEBUG) << "Send Gnss Raw to serial: bytes_written: " << bytes_written;
117 if (bytes_written < 0) {
118 LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
119 }
120 }
121
StartServer()122 void StartServer() {
123 // Create a new thread to handle writes to the gnss and to the any client
124 // connected to the socket.
125 read_thread_ = std::thread([this]() { ReadLoop(); });
126 }
127
StartReadNmeaFileThread()128 void StartReadNmeaFileThread() {
129 // Create a new thread to read nmea data.
130 nmea_file_read_thread_ =
131 std::thread([this]() { ReadNmeaFromLocalFile(); });
132 }
133
StartReadGnssRawMeasurementFileThread()134 void StartReadGnssRawMeasurementFileThread() {
135 // Create a new thread to read raw measurement data.
136 measurement_file_read_thread_ =
137 std::thread([this]() { ReadGnssRawMeasurement(); });
138 }
139
ReadNmeaFromLocalFile()140 void ReadNmeaFromLocalFile() {
141 std::ifstream file(FLAGS_gnss_file_path);
142 if (file.is_open()) {
143 std::string line;
144 std::string lastLine;
145 int count = 0;
146 while (std::getline(file, line)) {
147 count++;
148 /* Only support a lite version of NEMA format to make it simple.
149 * Records will only contains $GPGGA, $GPRMC,
150 * $GPGGA,213204.00,3725.371240,N,12205.589239,W,7,,0.38,-26.75,M,0.0,M,0.0,0000*78
151 * $GPRMC,213204.00,A,3725.371240,N,12205.589239,W,000.0,000.0,290819,,,A*49
152 * $GPGGA,....
153 * $GPRMC,....
154 * Sending at 1Hz, currently user should
155 * provide a NMEA file that has one location per second. need some extra work
156 * to make it more generic, i.e. align with the timestamp in the file.
157 */
158 if (count % 2 == 0) {
159 {
160 std::lock_guard<std::mutex> lock(cached_nmea_mutex);
161 cached_nmea = lastLine + '\n' + line;
162 }
163 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
164 }
165 lastLine = line;
166 }
167 file.close();
168 } else {
169 LOG(ERROR) << "Can not open NMEA file: " << FLAGS_gnss_file_path ;
170 return;
171 }
172 }
173
ReadGnssRawMeasurement()174 void ReadGnssRawMeasurement() {
175 std::ifstream file(FLAGS_gnss_file_path);
176
177 if (file.is_open()) {
178 std::string line;
179 std::string cached_line = "";
180 std::string header = "";
181
182 while (!cached_line.empty() || std::getline(file, line)) {
183 if (!cached_line.empty()) {
184 line = cached_line;
185 cached_line = "";
186 }
187
188 // Get data header.
189 if (header.empty() && android::base::StartsWith(line, "# Raw")) {
190 header = line;
191 LOG(DEBUG) << "Header: " << header;
192 continue;
193 }
194
195 // Ignore not raw measurement data.
196 if (!android::base::StartsWith(line, "Raw")) {
197 continue;
198 }
199
200 {
201 std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
202 cached_gnss_raw = header + "\n" + line;
203
204 std::string new_line = "";
205 while (std::getline(file, new_line)) {
206 // Group raw data by TimeNanos.
207 if (getTimeNanosFromLine(new_line) ==
208 getTimeNanosFromLine(line)) {
209 cached_gnss_raw += "\n" + new_line;
210 } else {
211 cached_line = new_line;
212 break;
213 }
214 }
215 }
216 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
217 }
218 file.close();
219 } else {
220 LOG(ERROR) << "Can not open GNSS Raw file: " << FLAGS_gnss_file_path;
221 return;
222 }
223 }
224
~GnssGrpcProxyServiceImpl()225 ~GnssGrpcProxyServiceImpl() {
226 if (nmea_file_read_thread_.joinable()) {
227 nmea_file_read_thread_.join();
228 }
229 if (measurement_file_read_thread_.joinable()) {
230 measurement_file_read_thread_.join();
231 }
232 if (read_thread_.joinable()) {
233 read_thread_.join();
234 }
235 }
236
237 private:
ReadLoop()238 [[noreturn]] void ReadLoop() {
239 cuttlefish::SharedFDSet read_set;
240 read_set.Set(gnss_out_);
241 std::vector<char> buffer(GNSS_SERIAL_BUFFER_SIZE);
242 int total_read = 0;
243 std::string gnss_cmd_str;
244 int flags = gnss_out_->Fcntl(F_GETFL, 0);
245 gnss_out_->Fcntl(F_SETFL, flags | O_NONBLOCK);
246 while (true) {
247 auto bytes_read = gnss_out_->Read(buffer.data(), buffer.size());
248 if (bytes_read > 0) {
249 std::string s(buffer.data(), bytes_read);
250 gnss_cmd_str += s;
251 // In case random string sent though /dev/gnss0, gnss_cmd_str will auto resize,
252 // to get rid of first page.
253 if (gnss_cmd_str.size() > GNSS_SERIAL_BUFFER_SIZE * 2) {
254 gnss_cmd_str = gnss_cmd_str.substr(gnss_cmd_str.size() - GNSS_SERIAL_BUFFER_SIZE);
255 }
256 total_read += bytes_read;
257 if (gnss_cmd_str.find(CMD_GET_LOCATION) != std::string::npos) {
258 sendToSerial();
259 gnss_cmd_str = "";
260 total_read = 0;
261 }
262
263 if (gnss_cmd_str.find(CMD_GET_RAWMEASUREMENT) != std::string::npos) {
264 sendGnssRawToSerial();
265 gnss_cmd_str = "";
266 total_read = 0;
267 }
268 } else {
269 if (gnss_out_->GetErrno() == EAGAIN|| gnss_out_->GetErrno() == EWOULDBLOCK) {
270 std::this_thread::sleep_for(std::chrono::milliseconds(100));
271 } else {
272 LOG(ERROR) << "Error reading fd " << FLAGS_gnss_out_fd << ": "
273 << " Error code: " << gnss_out_->GetErrno()
274 << " Error sg:" << gnss_out_->StrError();
275 }
276 }
277 }
278 }
279
getTimeNanosFromLine(const std::string & line)280 std::string getTimeNanosFromLine(const std::string& line) {
281 // TimeNanos is in column #3.
282 std::vector<std::string> vals = android::base::Split(line, ",");
283 return vals.size() >= 3 ? vals[2] : "-1";
284 }
285
isGnssRawMeasurement(const std::string & inputStr)286 bool isGnssRawMeasurement(const std::string& inputStr) {
287 // TODO: add more logic check to by pass invalid data.
288 return !inputStr.empty() && android::base::StartsWith(inputStr, "# Raw");
289 }
290
isNMEA(const std::string & inputStr)291 bool isNMEA(const std::string& inputStr) {
292 return !inputStr.empty() &&
293 (android::base::StartsWith(inputStr, "$GPRMC") ||
294 android::base::StartsWith(inputStr, "$GPRMA"));
295 }
296
297 cuttlefish::SharedFD gnss_in_;
298 cuttlefish::SharedFD gnss_out_;
299 std::thread read_thread_;
300 std::thread nmea_file_read_thread_;
301 std::thread measurement_file_read_thread_;
302
303 std::string cached_nmea;
304 std::mutex cached_nmea_mutex;
305
306 std::string cached_gnss_raw;
307 std::string previous_cached_gnss_raw;
308 std::mutex cached_gnss_raw_mutex;
309 };
310
RunServer()311 void RunServer() {
312 auto gnss_in = cuttlefish::SharedFD::Dup(FLAGS_gnss_in_fd);
313 close(FLAGS_gnss_in_fd);
314 if (!gnss_in->IsOpen()) {
315 LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_in_fd << ": "
316 << gnss_in->StrError();
317 return;
318 }
319 close(FLAGS_gnss_in_fd);
320
321 auto gnss_out = cuttlefish::SharedFD::Dup(FLAGS_gnss_out_fd);
322 close(FLAGS_gnss_out_fd);
323 if (!gnss_out->IsOpen()) {
324 LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_out_fd << ": "
325 << gnss_out->StrError();
326 return;
327 }
328 auto server_address("0.0.0.0:" + std::to_string(FLAGS_gnss_grpc_port));
329 GnssGrpcProxyServiceImpl service(gnss_in, gnss_out);
330 service.StartServer();
331 if (!FLAGS_gnss_file_path.empty()) {
332 // TODO: On-demand start the read file threads according to data type.
333 service.StartReadNmeaFileThread();
334 service.StartReadGnssRawMeasurementFileThread();
335
336 // In the local mode, we are not start a grpc server, use a infinite loop instead
337 while(true) {
338 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
339 }
340 } else {
341 ServerBuilder builder;
342 // Listen on the given address without any authentication mechanism.
343 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
344 // Register "service" as the instance through which we'll communicate with
345 // clients. In this case it corresponds to an *synchronous* service.
346 builder.RegisterService(&service);
347 // Finally assemble the server.
348 std::unique_ptr<Server> server(builder.BuildAndStart());
349 std::cout << "Server listening on " << server_address << std::endl;
350
351 // Wait for the server to shutdown. Note that some other thread must be
352 // responsible for shutting down the server for this call to ever return.
353 server->Wait();
354 }
355
356 }
357
358
main(int argc,char ** argv)359 int main(int argc, char** argv) {
360 ::android::base::InitLogging(argv, android::base::StderrLogger);
361 ::gflags::ParseCommandLineFlags(&argc, &argv, true);
362
363 LOG(DEBUG) << "Starting gnss grpc proxy server...";
364 RunServer();
365
366 return 0;
367 }